Keep fighting
parent
a76d2c41d3
commit
ad40c248d3
|
|
@ -32,6 +32,10 @@ labyrinth = [
|
||||||
ROWS = len(labyrinth)
|
ROWS = len(labyrinth)
|
||||||
COLS = len(labyrinth[0])
|
COLS = len(labyrinth[0])
|
||||||
|
|
||||||
|
# Q-Learning Constants
|
||||||
|
GAMMA = 0.90
|
||||||
|
ALPHA = 0.2
|
||||||
|
|
||||||
# Initialize game screen
|
# Initialize game screen
|
||||||
screen = pygame.display.set_mode((COLS * CELL_SIZE, ROWS * CELL_SIZE))
|
screen = pygame.display.set_mode((COLS * CELL_SIZE, ROWS * CELL_SIZE))
|
||||||
pygame.display.set_caption("Micro-Pacman")
|
pygame.display.set_caption("Micro-Pacman")
|
||||||
|
|
@ -129,9 +133,6 @@ def main():
|
||||||
|
|
||||||
s = (pacman.x, pacman.y, ghost.x, ghost.y) # as a tuple so the state becomes hashable
|
s = (pacman.x, pacman.y, ghost.x, ghost.y) # as a tuple so the state becomes hashable
|
||||||
q = rl.q_init()
|
q = rl.q_init()
|
||||||
a_opposite_direction = {0: 1, 1: 0, 2: 3, 3: 2}
|
|
||||||
gamma = 0.90
|
|
||||||
alpha = 0.2
|
|
||||||
|
|
||||||
# Game loop
|
# Game loop
|
||||||
running = True
|
running = True
|
||||||
|
|
@ -144,10 +145,6 @@ def main():
|
||||||
if event.type == pygame.QUIT:
|
if event.type == pygame.QUIT:
|
||||||
running = False
|
running = False
|
||||||
|
|
||||||
if iter%3==0:
|
|
||||||
# Ghost moves towards Pacman
|
|
||||||
ghost.move_towards_pacman(pacman)
|
|
||||||
|
|
||||||
# Check for collisions (game over if ghost catches pacman)
|
# Check for collisions (game over if ghost catches pacman)
|
||||||
if pacman.x == ghost.x and pacman.y == ghost.y:
|
if pacman.x == ghost.x and pacman.y == ghost.y:
|
||||||
print("Game Over! The ghost caught Pacman.")
|
print("Game Over! The ghost caught Pacman.")
|
||||||
|
|
@ -163,26 +160,27 @@ def main():
|
||||||
running = False
|
running = False
|
||||||
|
|
||||||
# Start of my code
|
# Start of my code
|
||||||
s_not_terminal = True
|
|
||||||
labyrinth_copy = [list(row) for row in labyrinth] # Create proper deep copy
|
labyrinth_copy = [list(row) for row in labyrinth] # Create proper deep copy
|
||||||
|
s_not_terminal = True
|
||||||
a = None
|
a = None
|
||||||
iteration = 0
|
iteration = 0
|
||||||
max_iterations = 50 # Prevent infinite loops
|
max_iterations = 50 # Prevent infinite loops
|
||||||
|
|
||||||
while s_not_terminal and iteration < max_iterations:
|
while s_not_terminal and iteration < max_iterations:
|
||||||
iteration += 1
|
iteration += 1
|
||||||
print("s: " + str(s))
|
print("s: " + str(s)) # debugging
|
||||||
print("q[s] before action: " + str(q[s]))
|
print("q[s] before action: " + str(q[s])) # debugging
|
||||||
|
|
||||||
a = rl.epsilon_greedy(q, s) # 0 = Left; 1 = Right ; 2 = Up ; 3 = Down
|
a = rl.epsilon_greedy(q, s) # 0 = Left; 1 = Right ; 2 = Up ; 3 = Down
|
||||||
s_new, r, labyrinth_copy = rl.take_action(s, a, labyrinth_copy)
|
s_new, r, labyrinth_copy = rl.take_action(s, a, labyrinth_copy)
|
||||||
|
|
||||||
q[s][a] += round(alpha * (r + gamma * rl.max_q(q, s_new, labyrinth) - q[s][a]), 2)
|
q[s][a] += ALPHA * (r + GAMMA * rl.max_q(q, s_new, labyrinth_copy) - q[s][a])
|
||||||
# q[s_new][a_opposite_direction[a]] += round(alpha * (r + gamma * max(q[s]) - q[s_new][a_opposite_direction[a]]), 2)
|
|
||||||
|
|
||||||
s = s_new
|
s = s_new
|
||||||
|
|
||||||
if all("." not in row for row in labyrinth_copy):
|
if all("." not in row for row in labyrinth_copy):
|
||||||
s_not_terminal = False
|
s_not_terminal = False
|
||||||
|
q[s][a] = 10.0
|
||||||
|
|
||||||
# Check for collisions (game over if ghost catches pacman)
|
# Check for collisions (game over if ghost catches pacman)
|
||||||
if s[0] == s[2] and s[1] == s[3]:
|
if s[0] == s[2] and s[1] == s[3]:
|
||||||
|
|
@ -190,16 +188,20 @@ def main():
|
||||||
q[s][a] = 0.01
|
q[s][a] = 0.01
|
||||||
print("There was just a collision!!!")
|
print("There was just a collision!!!")
|
||||||
print("s: " + str(s))
|
print("s: " + str(s))
|
||||||
|
print("Crashed values now q[s]: " + str(q[s]))
|
||||||
|
|
||||||
time.sleep(0.025)
|
time.sleep(0.025)
|
||||||
|
|
||||||
if iteration >= max_iterations:
|
if iteration >= max_iterations:
|
||||||
print(f"Max iterations reached ({max_iterations}), breaking out of loop")
|
print(f"Max iterations reached breaking out of loop")
|
||||||
|
|
||||||
s = (pacman.x, pacman.y, ghost.x, ghost.y) # as a tuple so the state becomes hashable
|
s = (pacman.x, pacman.y, ghost.x, ghost.y) # as a tuple so the state becomes hashable
|
||||||
a = rl.epsilon_greedy(q, s) # 0 = Left; 1 = Right ; 2 = Up ; 3 = Down
|
a = rl.epsilon_greedy(q, s) # 0 = Left; 1 = Right ; 2 = Up ; 3 = Down
|
||||||
move_pacman(pacman, a)
|
move_pacman(pacman, a)
|
||||||
print("NEW LOOP")
|
|
||||||
|
if iter%3==0:
|
||||||
|
# Ghost moves towards Pacman
|
||||||
|
ghost.move_towards_pacman(pacman)
|
||||||
|
|
||||||
# Draw the labyrinth, pacman, and ghost
|
# Draw the labyrinth, pacman, and ghost
|
||||||
draw_labyrinth()
|
draw_labyrinth()
|
||||||
|
|
@ -216,17 +218,3 @@ def main():
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
||||||
"""
|
|
||||||
for state_key in q:
|
|
||||||
if state_key[0] == s_new[0] and state_key[1] == s_new[1]:
|
|
||||||
# Update this state's Q-values based on the current transition, but only if action is valid
|
|
||||||
if q[state_key][a] > 0: # Only update if action is not blocked
|
|
||||||
q[state_key][a] += round(alpha * (r + gamma * max(q[s_new]) - q[state_key][a]), 2)
|
|
||||||
if q[state_key][opposite_action[a]] > 0: # Only update if opposite action is not blocked
|
|
||||||
q[state_key][opposite_action[a]] += round(alpha * (r + gamma * max(q[s_new]) - q[state_key][opposite_action[a]]), 2)
|
|
||||||
print("s_new: " + str(s_new))
|
|
||||||
print("q[s] after action with manipulated a: " + str(q[s]))
|
|
||||||
print("q[s_new] after action: " + str(q[s_new]))
|
|
||||||
print()
|
|
||||||
"""
|
|
||||||
|
|
@ -6,6 +6,10 @@ ausweicht und somit vermeidet gefressen zu werden.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
GAMMA = 0.90
|
||||||
|
ALPHA = 0.2
|
||||||
|
|
||||||
def q_init():
|
def q_init():
|
||||||
""" Fill every possible action in every state with a small value for initialization"""
|
""" Fill every possible action in every state with a small value for initialization"""
|
||||||
|
|
@ -68,7 +72,7 @@ def q_init():
|
||||||
# print(list(q_table.items())[:5]) # Uncomment to see the first 5 entries
|
# print(list(q_table.items())[:5]) # Uncomment to see the first 5 entries
|
||||||
return q_table
|
return q_table
|
||||||
|
|
||||||
def epsilon_greedy(q, s, epsilon=0.2):
|
def epsilon_greedy(q, s, epsilon=0.1):
|
||||||
"""
|
"""
|
||||||
Return which direction Pacman should move to using epsilon-greedy algorithm
|
Return which direction Pacman should move to using epsilon-greedy algorithm
|
||||||
With probability epsilon, choose a random action. Otherwise choose the greedy action.
|
With probability epsilon, choose a random action. Otherwise choose the greedy action.
|
||||||
|
|
@ -76,12 +80,13 @@ def epsilon_greedy(q, s, epsilon=0.2):
|
||||||
Never allows Pacman to move backwards (opposite direction).
|
Never allows Pacman to move backwards (opposite direction).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
"""
|
||||||
q_max = max(q[s])
|
q_max = max(q[s])
|
||||||
a = q[s].index(q_max)
|
a = q[s].index(q_max)
|
||||||
|
|
||||||
return a
|
return a
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if np.random.random() < epsilon:
|
if np.random.random() < epsilon:
|
||||||
# Explore: choose random action (excluding blocked actions with Q=0)
|
# Explore: choose random action (excluding blocked actions with Q=0)
|
||||||
valid_actions = [i for i in range(len(q[s])) if q[s][i] > 0]
|
valid_actions = [i for i in range(len(q[s])) if q[s][i] > 0]
|
||||||
|
|
@ -98,59 +103,32 @@ def epsilon_greedy(q, s, epsilon=0.2):
|
||||||
return best_action
|
return best_action
|
||||||
else:
|
else:
|
||||||
return 0
|
return 0
|
||||||
"""
|
|
||||||
|
|
||||||
def bfs_distance(start, end, labyrinth):
|
def max_q(q, s_new, labyrinth, depth=0, max_depth=2):
|
||||||
"""
|
"""Calculate Q-values for all possible actions in state s_new and return the maximum"""
|
||||||
Calculate shortest path distance between two points using BFS.
|
q_max = 0.01
|
||||||
Returns the distance or infinity if no path exists.
|
|
||||||
"""
|
|
||||||
from collections import deque
|
|
||||||
|
|
||||||
if start == end:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
queue = deque([(start, 0)]) # (position, distance)
|
|
||||||
visited = {start}
|
|
||||||
|
|
||||||
while queue:
|
|
||||||
(x, y), dist = queue.popleft()
|
|
||||||
|
|
||||||
# Check all 4 directions
|
|
||||||
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
|
|
||||||
nx, ny = x + dx, y + dy
|
|
||||||
|
|
||||||
if (nx, ny) == end:
|
|
||||||
return round(dist + 1, 2)
|
|
||||||
|
|
||||||
if 0 <= ny < len(labyrinth) and 0 <= nx < len(labyrinth[0]):
|
|
||||||
if (nx, ny) not in visited and labyrinth[ny][nx] != "#":
|
|
||||||
visited.add((nx, ny))
|
|
||||||
queue.append(((nx, ny), dist + 1))
|
|
||||||
|
|
||||||
return float('inf') # No path found
|
|
||||||
|
|
||||||
def max_q(q, s_new, labyrinth):
|
|
||||||
"""Calculate the maximum reward for all possible actions in state s_new"""
|
|
||||||
max_reward = float('-inf')
|
|
||||||
for a in range(4):
|
for a in range(4):
|
||||||
if q[s_new][a] > 0: # Only consider valid (non-blocked) actions
|
if q[s_new][a] > 0: # Only consider valid (non-blocked) actions
|
||||||
s_test = list(s_new)
|
s_test = tuple(list(s_new)[:2] + [s_new[2], s_new[3]]) # Keep ghost position
|
||||||
|
s_test_list = list(s_test)
|
||||||
if a == 0: # left
|
if a == 0: # left
|
||||||
s_test[0] -= 1
|
s_test_list[0] -= 1
|
||||||
elif a == 1: # right
|
elif a == 1: # right
|
||||||
s_test[0] += 1
|
s_test_list[0] += 1
|
||||||
elif a == 2: # up
|
elif a == 2: # up
|
||||||
s_test[1] -= 1
|
s_test_list[1] -= 1
|
||||||
elif a == 3: # down
|
elif a == 3: # down
|
||||||
s_test[1] += 1
|
s_test_list[1] += 1
|
||||||
|
s_test = tuple(s_test_list)
|
||||||
|
|
||||||
reward = calc_reward(tuple(s_test), labyrinth)
|
if s_test in q and depth < max_depth:
|
||||||
max_reward = max(max_reward, reward)
|
q[s_new][a] += ALPHA * (calc_reward(s_test, labyrinth) + GAMMA * max_q(q, s_test, labyrinth, depth + 1, max_depth) - q[s_new][a])
|
||||||
|
q_max = max(q_max, q[s_new][a])
|
||||||
|
|
||||||
return max_reward if max_reward != float('-inf') else 0.0
|
return q_max
|
||||||
|
|
||||||
def calc_reward(s_new, labyrinth):
|
def calc_reward(s_new, labyrinth):
|
||||||
|
"""
|
||||||
# consider new distance between Pacman and Ghost using actual pathfinding
|
# consider new distance between Pacman and Ghost using actual pathfinding
|
||||||
pacman_pos_new = (s_new[0], s_new[1])
|
pacman_pos_new = (s_new[0], s_new[1])
|
||||||
ghost_pos = (s_new[2], s_new[3])
|
ghost_pos = (s_new[2], s_new[3])
|
||||||
|
|
@ -161,25 +139,20 @@ def calc_reward(s_new, labyrinth):
|
||||||
r = 0
|
r = 0
|
||||||
|
|
||||||
if distance_new < 3:
|
if distance_new < 3:
|
||||||
r = -2
|
r = -3
|
||||||
elif distance_new == 4:
|
elif distance_new == 4:
|
||||||
r = 0.5
|
r = 1.0
|
||||||
elif distance_new > 4:
|
elif distance_new > 4:
|
||||||
r = 1
|
r = 2.0
|
||||||
|
"""
|
||||||
|
|
||||||
# Reward for cookies
|
# Reward for cookies
|
||||||
r += 1.0 if labyrinth[s_new[1]][s_new[0]] == "." else -1.5
|
r = 1.0 if labyrinth[s_new[1]][s_new[0]] == "." else -2.0
|
||||||
|
|
||||||
# efficiency experiment
|
|
||||||
r -= 0.1
|
|
||||||
|
|
||||||
r = max(r, 0.01)
|
|
||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def take_action(s, a, labyrinth):
|
def take_action(s, a, labyrinth):
|
||||||
labyrinth_copy = [list(row) for row in labyrinth]
|
# Use the labyrinth parameter (already updated from previous iterations)
|
||||||
labyrinth_copy[s[1]][s[0]] = " "
|
|
||||||
s_new = list(s)
|
s_new = list(s)
|
||||||
if a == 0: # left
|
if a == 0: # left
|
||||||
s_new[0] -= 1
|
s_new[0] -= 1
|
||||||
|
|
@ -190,6 +163,10 @@ def take_action(s, a, labyrinth):
|
||||||
if a == 3: # down
|
if a == 3: # down
|
||||||
s_new[1] += 1
|
s_new[1] += 1
|
||||||
|
|
||||||
r = calc_reward(s_new, labyrinth)
|
# Mark new Pacman position as eaten (if it's a cookie)
|
||||||
|
if labyrinth[s_new[1]][s_new[0]] == ".":
|
||||||
|
labyrinth[s_new[1]][s_new[0]] = " "
|
||||||
|
|
||||||
return tuple(s_new), r, labyrinth_copy
|
r = calc_reward(tuple(s_new), labyrinth)
|
||||||
|
|
||||||
|
return tuple(s_new), r, labyrinth
|
||||||
Loading…
Reference in New Issue