actually implemented that RL
parent
a891d51ca9
commit
a53583b1d7
|
|
@ -1,8 +1,6 @@
|
||||||
import pygame
|
import pygame
|
||||||
import random
|
|
||||||
import math
|
import math
|
||||||
import reinforcement_learning as rl
|
import reinforcement_learning as rl
|
||||||
import time
|
|
||||||
|
|
||||||
# Initialize pygame
|
# Initialize pygame
|
||||||
pygame.init()
|
pygame.init()
|
||||||
|
|
@ -125,19 +123,14 @@ def move_pacman(pacman, a):
|
||||||
# Main game function
|
# Main game function
|
||||||
def main():
|
def main():
|
||||||
global labyrinth
|
global labyrinth
|
||||||
clock = pygame.time.Clock()
|
|
||||||
|
|
||||||
# Initialize Pacman and Ghost positions
|
|
||||||
pacman = Pacman(1, 1)
|
|
||||||
ghost = Ghost(COLS - 2, ROWS - 2)
|
|
||||||
|
|
||||||
s = (pacman.x, pacman.y, ghost.x, ghost.y) # as a tuple so the state becomes hashable
|
|
||||||
q = rl.q_init()
|
q = rl.q_init()
|
||||||
|
|
||||||
|
clock = pygame.time.Clock()
|
||||||
|
|
||||||
# Game loop
|
# Game loop
|
||||||
not_won = True
|
not_won = True
|
||||||
running = True
|
outer_iter = 0
|
||||||
iter = 0
|
|
||||||
while not_won:
|
while not_won:
|
||||||
|
|
||||||
labyrinth = [
|
labyrinth = [
|
||||||
|
|
@ -147,20 +140,30 @@ def main():
|
||||||
"#........#",
|
"#........#",
|
||||||
"##########"
|
"##########"
|
||||||
]
|
]
|
||||||
|
running = True
|
||||||
|
iter = 0
|
||||||
|
|
||||||
|
# Initialize Pacman and Ghost positions
|
||||||
|
pacman = Pacman(1, 1)
|
||||||
|
ghost = Ghost(COLS - 2, ROWS - 2)
|
||||||
|
s = (pacman.x, pacman.y, ghost.x, ghost.y) # as a tuple so the state becomes hashable
|
||||||
|
|
||||||
# Handle events
|
# Handle events
|
||||||
for event in pygame.event.get():
|
for event in pygame.event.get():
|
||||||
if event.type == pygame.QUIT:
|
if event.type == pygame.QUIT:
|
||||||
not_won = False
|
not_won = False
|
||||||
|
|
||||||
while running:
|
print(outer_iter)
|
||||||
|
while running or iter < 100:
|
||||||
screen.fill(BLACK)
|
screen.fill(BLACK)
|
||||||
iter = iter + 1
|
iter = iter + 1
|
||||||
|
|
||||||
# Check for collisions (game over if ghost catches pacman)
|
# Check for collisions (game over if ghost catches pacman)
|
||||||
if pacman.x == ghost.x and pacman.y == ghost.y:
|
if pacman.x == ghost.x and pacman.y == ghost.y:
|
||||||
print("Game Over! The ghost caught Pacman.")
|
print("Game Over! The ghost caught Pacman.")
|
||||||
|
outer_iter = outer_iter + 1
|
||||||
running = False
|
running = False
|
||||||
|
break
|
||||||
|
|
||||||
# Eat cookies
|
# Eat cookies
|
||||||
if labyrinth[pacman.y][pacman.x] == ".":
|
if labyrinth[pacman.y][pacman.x] == ".":
|
||||||
|
|
@ -171,25 +174,29 @@ def main():
|
||||||
print("You Win! Pacman ate all the cookies.")
|
print("You Win! Pacman ate all the cookies.")
|
||||||
running = False
|
running = False
|
||||||
not_won = False
|
not_won = False
|
||||||
|
break
|
||||||
|
|
||||||
# Start of my code ######################################################################
|
# Q-Learning part ############################################################################
|
||||||
|
|
||||||
labyrinth_copy = [list(row) for row in labyrinth] # Create proper deep copy
|
|
||||||
|
|
||||||
a = rl.epsilon_greedy(q, s) # 0 = Left; 1 = Right ; 2 = Up ; 3 = Down
|
a = rl.epsilon_greedy(q, s) # 0 = Left; 1 = Right ; 2 = Up ; 3 = Down
|
||||||
s_new, r, labyrinth_copy = rl.take_action(s, a, labyrinth_copy)
|
s_new, r, labyrinth = rl.take_action(s, a, labyrinth)
|
||||||
|
# print(s) # debugging
|
||||||
|
# print(q[s]) # debugging
|
||||||
|
|
||||||
q[s][a] += ALPHA * (r + GAMMA * rl.max_q(q, s_new, labyrinth_copy) - q[s][a])
|
q[s][a] += ALPHA * (r + GAMMA * rl.max_q(q, s_new, labyrinth) - q[s][a])
|
||||||
|
|
||||||
s = s_new
|
s = s_new
|
||||||
|
|
||||||
# zumindest angeben wo der nächste punkt ist, ohne geist im zustand s.
|
|
||||||
# After everything was calculated; just move Pacman according to highest action a in Q-Table q.
|
|
||||||
move_pacman(pacman, a)
|
move_pacman(pacman, a)
|
||||||
|
|
||||||
if iter % 3 == 0:
|
if iter % 3 == 0:
|
||||||
# Ghost moves towards Pacman
|
# Ghost moves towards Pacman
|
||||||
ghost.move_towards_pacman(pacman)
|
ghost.move_towards_pacman(pacman)
|
||||||
|
# Update state
|
||||||
|
s = (pacman.x, pacman.y, ghost.x, ghost.y)
|
||||||
|
|
||||||
|
# End of Q-Learning part ######################################################################
|
||||||
|
|
||||||
# Draw the labyrinth, pacman, and ghost
|
# Draw the labyrinth, pacman, and ghost
|
||||||
draw_labyrinth()
|
draw_labyrinth()
|
||||||
|
|
@ -200,7 +207,9 @@ def main():
|
||||||
pygame.display.flip()
|
pygame.display.flip()
|
||||||
|
|
||||||
# Cap the frame rate
|
# Cap the frame rate
|
||||||
clock.tick(5)
|
# tick_speed = 100
|
||||||
|
tick_speed = 5 if outer_iter % 20 == 0 else 100
|
||||||
|
clock.tick(tick_speed)
|
||||||
|
|
||||||
pygame.quit()
|
pygame.quit()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ ausweicht und somit vermeidet gefressen zu werden.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from collections import deque
|
import random
|
||||||
|
|
||||||
GAMMA = 0.90
|
GAMMA = 0.90
|
||||||
ALPHA = 0.2
|
ALPHA = 0.2
|
||||||
|
|
@ -16,7 +16,8 @@ def q_init():
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
NUM_ACTIONS = 4
|
NUM_ACTIONS = 4
|
||||||
INITIAL_Q_VALUE = 2.0 # Small value for initialization
|
RAND_Q_VALUES = [random.uniform(-0.1, 0.1), random.uniform(-0.1, 0.1), random.uniform(-0.1, 0.1), random.uniform(-0.1, 0.1)]
|
||||||
|
# print(RAND_Q_VALUES) # debugging
|
||||||
|
|
||||||
# Labyrinth layout
|
# Labyrinth layout
|
||||||
labyrinth = [
|
labyrinth = [
|
||||||
|
|
@ -50,7 +51,7 @@ def q_init():
|
||||||
|
|
||||||
# Assign all possible states a tuple of values
|
# Assign all possible states a tuple of values
|
||||||
state_key = (s0, s1, s2, s3)
|
state_key = (s0, s1, s2, s3)
|
||||||
q_values = [INITIAL_Q_VALUE] * NUM_ACTIONS
|
q_values = RAND_Q_VALUES.copy() # Create a copy for each state
|
||||||
|
|
||||||
# Check which actions are blocked by walls
|
# Check which actions are blocked by walls
|
||||||
# Action 0: move left (s0 - 1)
|
# Action 0: move left (s0 - 1)
|
||||||
|
|
@ -72,7 +73,7 @@ def q_init():
|
||||||
# print(list(q_table.items())[:5]) # Uncomment to see the first 5 entries
|
# print(list(q_table.items())[:5]) # Uncomment to see the first 5 entries
|
||||||
return q_table
|
return q_table
|
||||||
|
|
||||||
def epsilon_greedy(q, s, epsilon=0.025):
|
def epsilon_greedy(q, s, epsilon=0.1):
|
||||||
"""
|
"""
|
||||||
Return which direction Pacman should move to using epsilon-greedy algorithm
|
Return which direction Pacman should move to using epsilon-greedy algorithm
|
||||||
With probability epsilon, choose a random action. Otherwise choose the greedy action.
|
With probability epsilon, choose a random action. Otherwise choose the greedy action.
|
||||||
|
|
@ -102,44 +103,15 @@ def epsilon_greedy(q, s, epsilon=0.025):
|
||||||
elif a == 3: # down
|
elif a == 3: # down
|
||||||
s_test[1] += 1
|
s_test[1] += 1
|
||||||
|
|
||||||
# Check if this action would cause collision
|
|
||||||
if s_test[0] == s[2] and s_test[1] == s[3]:
|
|
||||||
continue # Skip this action, try next highest Q-value
|
|
||||||
|
|
||||||
return a
|
return a
|
||||||
|
|
||||||
def max_q(q, s_new, labyrinth, depth=0, max_depth=1):
|
|
||||||
"""Calculate Q-values for all possible actions in state s_new and return the maximum"""
|
|
||||||
q_max = 0
|
|
||||||
for a in range(4):
|
|
||||||
if q[s_new][a] != None and s_new in q: # Only consider valid (non-blocked) actions
|
|
||||||
s_test = tuple(list(s_new)[:2] + [s_new[2], s_new[3]]) # Keep ghost position
|
|
||||||
s_test_list = list(s_test)
|
|
||||||
if a == 0: # left
|
|
||||||
s_test_list[0] -= 1
|
|
||||||
elif a == 1: # right
|
|
||||||
s_test_list[0] += 1
|
|
||||||
elif a == 2: # up
|
|
||||||
s_test_list[1] -= 1
|
|
||||||
elif a == 3: # down
|
|
||||||
s_test_list[1] += 1
|
|
||||||
s_test = tuple(s_test_list)
|
|
||||||
|
|
||||||
if s_test in q and depth < max_depth:
|
|
||||||
q[s_new][a] += ALPHA * (calc_reward(s_test, labyrinth) + GAMMA * max_q(q, s_test, labyrinth, depth + 1, max_depth) - q[s_new][a])
|
|
||||||
q_max = max(q_max, q[s_new][a])
|
|
||||||
|
|
||||||
return q_max
|
|
||||||
|
|
||||||
def calc_reward(s_new, labyrinth):
|
def calc_reward(s_new, labyrinth):
|
||||||
|
|
||||||
# Reward for cookies; punish for not eating cookies
|
# Reward for cookies; punish for not eating cookies
|
||||||
r = 1.0 if labyrinth[s_new[1]][s_new[0]] == "." else -1.0
|
r = 1.0 if labyrinth[s_new[1]][s_new[0]] == "." else -1.0
|
||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def take_action(s, a, labyrinth):
|
def take_action(s, a, labyrinth):
|
||||||
# Use the labyrinth parameter (already updated from previous iterations)
|
|
||||||
s_new = list(s)
|
s_new = list(s)
|
||||||
if a == 0: # left
|
if a == 0: # left
|
||||||
s_new[0] -= 1
|
s_new[0] -= 1
|
||||||
|
|
@ -150,10 +122,30 @@ def take_action(s, a, labyrinth):
|
||||||
if a == 3: # down
|
if a == 3: # down
|
||||||
s_new[1] += 1
|
s_new[1] += 1
|
||||||
|
|
||||||
# Mark new Pacman position as eaten (if it's a cookie)
|
# Check if action caused gameover (Pacman caught by ghost)
|
||||||
if labyrinth[s_new[1]][s_new[0]] == ".":
|
if s_new[0] == s_new[2] and s_new[1] == s_new[3]:
|
||||||
labyrinth[s_new[1]][s_new[0]] = " "
|
r = -100.0
|
||||||
|
print("Invalid action type shit")
|
||||||
|
else:
|
||||||
r = calc_reward(tuple(s_new), labyrinth)
|
r = calc_reward(tuple(s_new), labyrinth)
|
||||||
|
|
||||||
|
# Mark new Pacman position as eaten (if it's a cookie)
|
||||||
|
if labyrinth[s_new[1]][s_new[0]] == ".":
|
||||||
|
# Convert string row to list, modify it, then convert back to string
|
||||||
|
row_list = list(labyrinth[s_new[1]])
|
||||||
|
row_list[s_new[0]] = " "
|
||||||
|
labyrinth[s_new[1]] = "".join(row_list)
|
||||||
|
|
||||||
return tuple(s_new), r, labyrinth
|
return tuple(s_new), r, labyrinth
|
||||||
|
|
||||||
|
def max_q(q, s_new, labyrinth):
|
||||||
|
"""Return the maximum Q-value among valid actions in state s_new"""
|
||||||
|
if s_new not in q:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
q_max = 0
|
||||||
|
for a in range(4):
|
||||||
|
if q[s_new][a] is not None: # Only consider valid (non-blocked) actions
|
||||||
|
q_max = max(q_max, q[s_new][a])
|
||||||
|
|
||||||
|
return q_max
|
||||||
Loading…
Reference in New Issue