Python 3 非阻塞同步行为

标签 python python-3.x pygame multiprocessing synchronous

我正在使用 Pygame 在 python3 中制作经典的 atari 贪吃蛇游戏.我想生成一个子进程来监听击键,以便每当玩家输入一个键(向上、向下、向左或向右)时,子进程都会向父进程发送该键。但是这个管道不应该被阻塞,这样蛇就可以沿着它行进的方向行进,直到收到 key 。

我在 multi-processes 上找到了 Python 的官方文档,但它没有描述我想要的行为,或者至少没有记录示例用法是否被阻止。有人可以举例说明如何实现吗?

最佳答案

你说:

I want to create an interface for an AI to take control of the snake. It wouldn't be fair if the state of the game is simply passed to the AI on each iteration b/c it could then just take as long as it want to compute the next move. Hence why it should be synchronous and non-blocking.

所以为了得到你想要的,你需要一个抽象。在下面的示例中,我创建了一个执行此操作的 Controller 类。 KeyboardController 处理键盘输入,而 AsyncController 启动一个线程并使用 Queue 类传递游戏状态和周围“AI”的决定.请注意,您必须在主线程上获取 pygame 事件,因此我在主循环中执行此操作并将事件简单地传递给 Controller ​​。

您的 AI 必须由 worker 函数调用。如您所见,目前 worker 函数中的“AI”仅每 0.5 秒执行一次,而帧率为 120。AI 需要这么长时间来做出决定对游戏来说无关紧要。

代码如下:

import pygame
import time
import random
from queue import Queue, Empty
from threading import Thread

class Controller():
    def __init__(self, color, message, actor):
        self.color = color
        self.message = message
        if actor: self.attach(actor)

    def attach(self, actor):
        self.actor = actor
        self.actor.controller = self
        self.actor.image.fill(self.color)

class AsyncController(Controller):
    def __init__(self, actor=None):
        super().__init__(pygame.Color('orange'), "AI is in control.", actor)
        self.out_queue = Queue()
        self.in_queue  = Queue()
        t = Thread(target=self.worker)
        t.daemon = True
        t.start()

    def update(self, events, dt):
        for e in events:
            if e.type == pygame.KEYDOWN:
                if e.key == pygame.K_SPACE: self.actor.controller = KeyboardController(self.actor)

        self.out_queue.put_nowait((self.actor, events, dt))
        try: return self.in_queue.get_nowait()
        except Empty: pass

    def worker(self):
        while True:
            try:
                actor, events, dt = self.out_queue.get_nowait()
                if actor.rect.x < 100: self.in_queue.put_nowait(pygame.Vector2(1, 0))
                if actor.rect.x > 600: self.in_queue.put_nowait(pygame.Vector2(-1, 0))
                if actor.rect.y < 100: self.in_queue.put_nowait(pygame.Vector2(0, 1))
                if actor.rect.y > 400: self.in_queue.put_nowait(pygame.Vector2(0, -1))
                if random.randrange(1, 100) < 15:
                    self.in_queue.put_nowait(random.choice([
                        pygame.Vector2(1, 0),
                        pygame.Vector2(-1, 0),
                        pygame.Vector2(0, -1), 
                        pygame.Vector2(0, 1)]))

                time.sleep(0.5)
            except Empty:
                pass

class KeyboardController(Controller):
    def __init__(self, actor=None):
        super().__init__(pygame.Color('dodgerblue'), "You're in control.", actor)

    def update(self, events, dt):
        for e in events:
            if e.type == pygame.KEYDOWN:
                if e.key == pygame.K_SPACE: self.actor.controller = AsyncController(self.actor)
                if e.key == pygame.K_UP: return pygame.Vector2(0, -1)
                if e.key == pygame.K_DOWN: return pygame.Vector2(0, 1)
                if e.key == pygame.K_LEFT: return pygame.Vector2(-1, 0)
                if e.key == pygame.K_RIGHT: return pygame.Vector2(1, 0)

class Actor(pygame.sprite.Sprite):
    def __init__(self):
        super().__init__()
        self.image = pygame.Surface((32, 32))
        self.image.fill(pygame.Color('dodgerblue'))
        self.rect = self.image.get_rect(center=(100, 100))
        self.direction = pygame.Vector2(1, 0)
        self.pos = self.rect.center

    def update(self, events, dt):
        new_direction = self.controller.update(events, dt)
        if new_direction:
            self.direction = new_direction
        self.pos += (self.direction * dt * 0.2)
        self.rect.center = self.pos

def main():
    pygame.init()

    actor   = Actor()
    sprites = pygame.sprite.Group(actor)
    screen  = pygame.display.set_mode([800,600])
    clock   = pygame.time.Clock()
    font    = pygame.font.SysFont("consolas", 20, True)
    dt      = 0
    KeyboardController(actor)

    while True:
        events = pygame.event.get()
        for e in events:
            if e.type == pygame.QUIT:
                return

        sprites.update(events, dt)
        screen.fill(pygame.Color('grey12'))
        screen.blit(font.render(actor.controller.message + ' [SPACE] to change to keyboard control.', True, pygame.Color('white')), (10, 10))
        sprites.draw(screen)
        dt = clock.tick(120)
        pygame.display.update()

if __name__ == '__main__':
    main()

enter image description here

请注意,此实现使用了无限队列。您想要添加一些逻辑来清除队列,这样您的游戏就不会使用大量内存。

关于Python 3 非阻塞同步行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54209439/

相关文章:

mysql - 在 Hadoop 中使用 apache Airflow 配置 MySql 时遇到问题

python - Pygame 简单程序测试显示卡住

python - 如何让 venv 访问 PyCharm 中的 pygame 模块?

python - 具有只读共享内存的 Python 中的多处理?

python - TypeError: 'tuple' 对象在尝试调用方法时不可调用

python - Django 管理站点 -- 查看路径 "admin/"at "/"

python - 自动完成并不总是适用于 Pycharm

python - 如何将类属性传递给方法装饰器?

python - 在 python 中以编程方式执行和终止长时间运行的批处理

python - 在 python/pandas 中将年/月转换为年/月/日