我正在使用 Pygame 在 python3 中制作经典的 atari 贪吃蛇游戏.我想生成一个子进程来监听击键,以便每当玩家输入一个键(向上、向下、向左或向右)时,子进程都会向父进程发送该键。但是这个管道不应该被阻塞,这样蛇就可以沿着它行进的方向行进,直到收到 key 。
我在 multi-processes 上找到了 Python 的官方文档,但它没有描述我想要的行为,或者至少没有记录示例用法是否被阻止。有人可以举例说明如何实现吗?
最佳答案
你说:
I want to create an interface for an AI to take control of the snake. It wouldn't be fair if the state of the game is simply passed to the AI on each iteration b/c it could then just take as long as it want to compute the next move. Hence why it should be synchronous and non-blocking.
所以为了得到你想要的,你需要一个抽象。在下面的示例中,我创建了一个执行此操作的 Controller
类。 KeyboardController
处理键盘输入,而 AsyncController
启动一个线程并使用 Queue
类传递游戏状态和周围“AI”的决定.请注意,您必须在主线程上获取 pygame 事件,因此我在主循环中执行此操作并将事件简单地传递给 Controller 。
您的 AI 必须由 worker
函数调用。如您所见,目前 worker 函数中的“AI”仅每 0.5 秒执行一次,而帧率为 120。AI 需要这么长时间来做出决定对游戏来说无关紧要。
代码如下:
import pygame
import time
import random
from queue import Queue, Empty
from threading import Thread
class Controller():
def __init__(self, color, message, actor):
self.color = color
self.message = message
if actor: self.attach(actor)
def attach(self, actor):
self.actor = actor
self.actor.controller = self
self.actor.image.fill(self.color)
class AsyncController(Controller):
def __init__(self, actor=None):
super().__init__(pygame.Color('orange'), "AI is in control.", actor)
self.out_queue = Queue()
self.in_queue = Queue()
t = Thread(target=self.worker)
t.daemon = True
t.start()
def update(self, events, dt):
for e in events:
if e.type == pygame.KEYDOWN:
if e.key == pygame.K_SPACE: self.actor.controller = KeyboardController(self.actor)
self.out_queue.put_nowait((self.actor, events, dt))
try: return self.in_queue.get_nowait()
except Empty: pass
def worker(self):
while True:
try:
actor, events, dt = self.out_queue.get_nowait()
if actor.rect.x < 100: self.in_queue.put_nowait(pygame.Vector2(1, 0))
if actor.rect.x > 600: self.in_queue.put_nowait(pygame.Vector2(-1, 0))
if actor.rect.y < 100: self.in_queue.put_nowait(pygame.Vector2(0, 1))
if actor.rect.y > 400: self.in_queue.put_nowait(pygame.Vector2(0, -1))
if random.randrange(1, 100) < 15:
self.in_queue.put_nowait(random.choice([
pygame.Vector2(1, 0),
pygame.Vector2(-1, 0),
pygame.Vector2(0, -1),
pygame.Vector2(0, 1)]))
time.sleep(0.5)
except Empty:
pass
class KeyboardController(Controller):
def __init__(self, actor=None):
super().__init__(pygame.Color('dodgerblue'), "You're in control.", actor)
def update(self, events, dt):
for e in events:
if e.type == pygame.KEYDOWN:
if e.key == pygame.K_SPACE: self.actor.controller = AsyncController(self.actor)
if e.key == pygame.K_UP: return pygame.Vector2(0, -1)
if e.key == pygame.K_DOWN: return pygame.Vector2(0, 1)
if e.key == pygame.K_LEFT: return pygame.Vector2(-1, 0)
if e.key == pygame.K_RIGHT: return pygame.Vector2(1, 0)
class Actor(pygame.sprite.Sprite):
def __init__(self):
super().__init__()
self.image = pygame.Surface((32, 32))
self.image.fill(pygame.Color('dodgerblue'))
self.rect = self.image.get_rect(center=(100, 100))
self.direction = pygame.Vector2(1, 0)
self.pos = self.rect.center
def update(self, events, dt):
new_direction = self.controller.update(events, dt)
if new_direction:
self.direction = new_direction
self.pos += (self.direction * dt * 0.2)
self.rect.center = self.pos
def main():
pygame.init()
actor = Actor()
sprites = pygame.sprite.Group(actor)
screen = pygame.display.set_mode([800,600])
clock = pygame.time.Clock()
font = pygame.font.SysFont("consolas", 20, True)
dt = 0
KeyboardController(actor)
while True:
events = pygame.event.get()
for e in events:
if e.type == pygame.QUIT:
return
sprites.update(events, dt)
screen.fill(pygame.Color('grey12'))
screen.blit(font.render(actor.controller.message + ' [SPACE] to change to keyboard control.', True, pygame.Color('white')), (10, 10))
sprites.draw(screen)
dt = clock.tick(120)
pygame.display.update()
if __name__ == '__main__':
main()
请注意,此实现使用了无限队列。您想要添加一些逻辑来清除队列,这样您的游戏就不会使用大量内存。
关于Python 3 非阻塞同步行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54209439/