Module malmoext.scenario
Expand source code
import malmo.MalmoPython as MalmoPython
from malmoext.malmo_bootstrap import MalmoBootstrap
from malmo.malmoutils import parse_command_line, get_default_recording_object
from malmoext.scenario_builder import ScenarioBuilder
from malmoext.agent import Agent
from abc import abstractmethod
import time
class Scenario:
'''A Scenario defines the construction of a Minecraft simulation, and the actions that one or more agents should perform
based on the changing state of that simulation over time.
Each implementation of a Scenario must implement the following two methods:
- build_scenario - Constructs the starting state of the simulation
- on_tick - Performs one or more agent actions on each simulation tick'''
def __init__(self):
self.__builder = ScenarioBuilder()
self.__agents = {} # type: dict[str, Agent]
@abstractmethod
def build_scenario(self, builder: ScenarioBuilder) -> None:
'''Method that constructs the details of a scenario. An empty ScenarioBuilder is provided as input to this method,
and is expected to be modified prior to returning.
Example implementation:
builder.set_description('Swing Sword')
builder.set_time_limit(30.0)
builder.add_agent('agent1')
builder.agents['agent1'].addInventory(Items.All.diamond_sword, Inventory.HotBar._0)
builder.agents['agent1'].set_position(Vector(0, 4, 0))
builder.world.addMob(Mobs.Zombie, Vector(5, 4, 0))
'''
pass
@abstractmethod
def on_tick(self, agents: 'dict[str, Agent]') -> None:
'''Method that is called on each clock tick, where any number of agent actions can be initiated. The full
list of agent actions can be found at https://github.com/NateRex/malmoext/blob/master/docs/agent_actions.md
The actions performed by an agent may depend on the current game state. Documentation on what state details
are observable can be found at https://github.com/NateRex/malmoext/blob/master/docs/scenario_state.md
Example implementation:
agent = getAgent()
mob = agent.closest_hostile_mob()
if mob != None:
if agent.lookAt(mob) and agent.moveTo(mob):
agent.attack()
else:
agent.doNothing()
'''
pass
def run(self, ports=[10000]) -> None:
'''Executes this scenario within the Malmo Minecraft instances running on the given ports. By default, a single
Malmo Minecraft instance running on port 10000 is assumed.
Documentation on how to run one or more Malmo Minecraft instances on different ports can be found at
https://github.com/NateRex/malmoext/blob/master/README.md#running-a-scenario
'''
# Initialize Malmo Platform environment
MalmoBootstrap.init_env()
# Construct scenario
self.build_scenario(self.__builder)
# Validate scenario
numAgents = len(self.__builder.agents)
if (len(ports) < numAgents):
raise Exception('Number of agents must not exceed the number of Malmo Minecraft instances currently running.')
if (numAgents == 0):
print('No agents present in scenario. Exiting.')
exit(0)
# Construct agents
clientPool = MalmoPython.ClientPool()
agentIdx = 0
agentZero = None
for builder in self.__builder.agents.values():
agent = Agent(builder)
self.__agents[agent.get_name()] = agent
clientPool.add(MalmoPython.ClientInfo('127.0.0.1', ports[agentIdx]))
if (agentZero is None):
agentZero = agent
agentIdx += 1
# Load the scenario
mission = MalmoPython.MissionSpec(self.__builder.build(), True)
parse_command_line(agentZero.get_host())
# Start the mission
agentIdx = 0
for agent in self.__agents.values():
recordingObject = get_default_recording_object(agentZero.get_host(), "agent_{}_viewpoint_continuous".format(agentIdx + 1))
self.__start_host_mission(agent, mission, clientPool, recordingObject, agentIdx, '')
agentIdx += 1
# Wait for mission to start
self.__wait_for_mission_start()
# While mission is running, repeatedly synchronize the local state with the remote server state,
# and execute agent actions (assume the time limit is the same across all agents)
while (agentZero.is_mission_active()):
# Avoid handing off control while we are still waiting to receive observations for one or more agents
if not self.__all_agents_have_observations():
continue
# Sync agent states
for agent in self.__agents.values():
agent._sync()
# Call handler to perform agent actions
self.on_tick(self.__agents)
time.sleep(0.05)
print('Mission has ended.')
def __start_host_mission(self, agent, mission, client_pool, recording, role, experimentId) -> None:
'''Attempts to start a mission for an agent host. Will automatically retry on failure. After multiple,
failures, an error will be reported and the program will exit.'''
used_attempts = 0
max_attempts = 5
print("Starting mission for agent ", role)
while True:
try:
agent.get_host().startMission(mission, client_pool, recording, role, experimentId)
break
except MalmoPython.MissionException as e:
errorCode = e.details.errorCode
if errorCode == MalmoPython.MissionErrorCode.MISSION_SERVER_WARMING_UP:
print("Server not quite ready yet - waiting...")
time.sleep(2)
elif errorCode == MalmoPython.MissionErrorCode.MISSION_INSUFFICIENT_CLIENTS_AVAILABLE:
print("Not enough available Minecraft instances running.")
used_attempts += 1
if used_attempts < max_attempts:
print("Will wait in case they are starting up.", max_attempts - used_attempts, "attempts left.")
time.sleep(2)
elif errorCode == MalmoPython.MissionErrorCode.MISSION_SERVER_NOT_FOUND:
print("Server not found - has the mission with role 0 been started yet?")
used_attempts += 1
if used_attempts < max_attempts:
print("Will wait and retry.", max_attempts - used_attempts, "attempts left.")
time.sleep(2)
else:
print("Other error:", e.message)
print("Waiting will not help here - bailing immediately.")
exit(1)
if used_attempts == max_attempts:
print("All chances used up - bailing now.")
exit(1)
print("startMission called okay.")
def __wait_for_mission_start(self) -> None:
'''This method will block execution until all given hosts have succesfully started their mission. If any host
fails to begin their mission, a timeout error will occur and the program will exit.'''
print("Waiting for the mission to start", end=' ')
agents = self.__agents.values()
start_flags = [False for a in agents]
start_time = time.time()
time_out = 120 # Allow two minutes for mission to start.
while not all(start_flags) and time.time() - start_time < time_out:
states = [a.get_host().peekWorldState() for a in agents]
start_flags = [w.has_mission_begun for w in states]
errors = [e for w in states for e in w.errors]
if len(errors) > 0:
print("Errors waiting for mission start:")
for e in errors:
print(e.text)
print("Bailing now.")
exit(1)
time.sleep(0.1)
print(".", end=' ')
print()
if time.time() - start_time >= time_out:
print("Timed out waiting for mission to begin. Bailing.")
exit(1)
print("Mission has started.")
def __all_agents_have_observations(self):
'''Returns true if all agents have a received a new observation from the server. Returns false otherwise.'''
for agent in self.__agents.values():
num_observations = agent.get_host().peekWorldState().number_of_observations_since_last_state
if num_observations == 0:
return False
return True
Classes
class Scenario
-
A Scenario defines the construction of a Minecraft simulation, and the actions that one or more agents should perform based on the changing state of that simulation over time.
Each implementation of a Scenario must implement the following two methods: - build_scenario - Constructs the starting state of the simulation - on_tick - Performs one or more agent actions on each simulation tick
Expand source code
class Scenario: '''A Scenario defines the construction of a Minecraft simulation, and the actions that one or more agents should perform based on the changing state of that simulation over time. Each implementation of a Scenario must implement the following two methods: - build_scenario - Constructs the starting state of the simulation - on_tick - Performs one or more agent actions on each simulation tick''' def __init__(self): self.__builder = ScenarioBuilder() self.__agents = {} # type: dict[str, Agent] @abstractmethod def build_scenario(self, builder: ScenarioBuilder) -> None: '''Method that constructs the details of a scenario. An empty ScenarioBuilder is provided as input to this method, and is expected to be modified prior to returning. Example implementation: builder.set_description('Swing Sword') builder.set_time_limit(30.0) builder.add_agent('agent1') builder.agents['agent1'].addInventory(Items.All.diamond_sword, Inventory.HotBar._0) builder.agents['agent1'].set_position(Vector(0, 4, 0)) builder.world.addMob(Mobs.Zombie, Vector(5, 4, 0)) ''' pass @abstractmethod def on_tick(self, agents: 'dict[str, Agent]') -> None: '''Method that is called on each clock tick, where any number of agent actions can be initiated. The full list of agent actions can be found at https://github.com/NateRex/malmoext/blob/master/docs/agent_actions.md The actions performed by an agent may depend on the current game state. Documentation on what state details are observable can be found at https://github.com/NateRex/malmoext/blob/master/docs/scenario_state.md Example implementation: agent = getAgent() mob = agent.closest_hostile_mob() if mob != None: if agent.lookAt(mob) and agent.moveTo(mob): agent.attack() else: agent.doNothing() ''' pass def run(self, ports=[10000]) -> None: '''Executes this scenario within the Malmo Minecraft instances running on the given ports. By default, a single Malmo Minecraft instance running on port 10000 is assumed. Documentation on how to run one or more Malmo Minecraft instances on different ports can be found at https://github.com/NateRex/malmoext/blob/master/README.md#running-a-scenario ''' # Initialize Malmo Platform environment MalmoBootstrap.init_env() # Construct scenario self.build_scenario(self.__builder) # Validate scenario numAgents = len(self.__builder.agents) if (len(ports) < numAgents): raise Exception('Number of agents must not exceed the number of Malmo Minecraft instances currently running.') if (numAgents == 0): print('No agents present in scenario. Exiting.') exit(0) # Construct agents clientPool = MalmoPython.ClientPool() agentIdx = 0 agentZero = None for builder in self.__builder.agents.values(): agent = Agent(builder) self.__agents[agent.get_name()] = agent clientPool.add(MalmoPython.ClientInfo('127.0.0.1', ports[agentIdx])) if (agentZero is None): agentZero = agent agentIdx += 1 # Load the scenario mission = MalmoPython.MissionSpec(self.__builder.build(), True) parse_command_line(agentZero.get_host()) # Start the mission agentIdx = 0 for agent in self.__agents.values(): recordingObject = get_default_recording_object(agentZero.get_host(), "agent_{}_viewpoint_continuous".format(agentIdx + 1)) self.__start_host_mission(agent, mission, clientPool, recordingObject, agentIdx, '') agentIdx += 1 # Wait for mission to start self.__wait_for_mission_start() # While mission is running, repeatedly synchronize the local state with the remote server state, # and execute agent actions (assume the time limit is the same across all agents) while (agentZero.is_mission_active()): # Avoid handing off control while we are still waiting to receive observations for one or more agents if not self.__all_agents_have_observations(): continue # Sync agent states for agent in self.__agents.values(): agent._sync() # Call handler to perform agent actions self.on_tick(self.__agents) time.sleep(0.05) print('Mission has ended.') def __start_host_mission(self, agent, mission, client_pool, recording, role, experimentId) -> None: '''Attempts to start a mission for an agent host. Will automatically retry on failure. After multiple, failures, an error will be reported and the program will exit.''' used_attempts = 0 max_attempts = 5 print("Starting mission for agent ", role) while True: try: agent.get_host().startMission(mission, client_pool, recording, role, experimentId) break except MalmoPython.MissionException as e: errorCode = e.details.errorCode if errorCode == MalmoPython.MissionErrorCode.MISSION_SERVER_WARMING_UP: print("Server not quite ready yet - waiting...") time.sleep(2) elif errorCode == MalmoPython.MissionErrorCode.MISSION_INSUFFICIENT_CLIENTS_AVAILABLE: print("Not enough available Minecraft instances running.") used_attempts += 1 if used_attempts < max_attempts: print("Will wait in case they are starting up.", max_attempts - used_attempts, "attempts left.") time.sleep(2) elif errorCode == MalmoPython.MissionErrorCode.MISSION_SERVER_NOT_FOUND: print("Server not found - has the mission with role 0 been started yet?") used_attempts += 1 if used_attempts < max_attempts: print("Will wait and retry.", max_attempts - used_attempts, "attempts left.") time.sleep(2) else: print("Other error:", e.message) print("Waiting will not help here - bailing immediately.") exit(1) if used_attempts == max_attempts: print("All chances used up - bailing now.") exit(1) print("startMission called okay.") def __wait_for_mission_start(self) -> None: '''This method will block execution until all given hosts have succesfully started their mission. If any host fails to begin their mission, a timeout error will occur and the program will exit.''' print("Waiting for the mission to start", end=' ') agents = self.__agents.values() start_flags = [False for a in agents] start_time = time.time() time_out = 120 # Allow two minutes for mission to start. while not all(start_flags) and time.time() - start_time < time_out: states = [a.get_host().peekWorldState() for a in agents] start_flags = [w.has_mission_begun for w in states] errors = [e for w in states for e in w.errors] if len(errors) > 0: print("Errors waiting for mission start:") for e in errors: print(e.text) print("Bailing now.") exit(1) time.sleep(0.1) print(".", end=' ') print() if time.time() - start_time >= time_out: print("Timed out waiting for mission to begin. Bailing.") exit(1) print("Mission has started.") def __all_agents_have_observations(self): '''Returns true if all agents have a received a new observation from the server. Returns false otherwise.''' for agent in self.__agents.values(): num_observations = agent.get_host().peekWorldState().number_of_observations_since_last_state if num_observations == 0: return False return True
Methods
def build_scenario(self, builder: ScenarioBuilder) ‑> NoneType
-
Method that constructs the details of a scenario. An empty ScenarioBuilder is provided as input to this method, and is expected to be modified prior to returning.
Example implementation:
builder.set_description('Swing Sword') builder.set_time_limit(30.0) builder.add_agent('agent1') builder.agents['agent1'].addInventory(Items.All.diamond_sword, Inventory.HotBar._0) builder.agents['agent1'].set_position(Vector(0, 4, 0)) builder.world.addMob(Mobs.Zombie, Vector(5, 4, 0))
Expand source code
@abstractmethod def build_scenario(self, builder: ScenarioBuilder) -> None: '''Method that constructs the details of a scenario. An empty ScenarioBuilder is provided as input to this method, and is expected to be modified prior to returning. Example implementation: builder.set_description('Swing Sword') builder.set_time_limit(30.0) builder.add_agent('agent1') builder.agents['agent1'].addInventory(Items.All.diamond_sword, Inventory.HotBar._0) builder.agents['agent1'].set_position(Vector(0, 4, 0)) builder.world.addMob(Mobs.Zombie, Vector(5, 4, 0)) ''' pass
def on_tick(self, agents: dict[str, Agent])
-
Method that is called on each clock tick, where any number of agent actions can be initiated. The full list of agent actions can be found at https://github.com/NateRex/malmoext/blob/master/docs/agent_actions.md
The actions performed by an agent may depend on the current game state. Documentation on what state details are observable can be found at https://github.com/NateRex/malmoext/blob/master/docs/scenario_state.md
Example implementation:
agent = getAgent() mob = agent.closest_hostile_mob() if mob != None: if agent.lookAt(mob) and agent.moveTo(mob): agent.attack() else: agent.doNothing()
Expand source code
@abstractmethod def on_tick(self, agents: 'dict[str, Agent]') -> None: '''Method that is called on each clock tick, where any number of agent actions can be initiated. The full list of agent actions can be found at https://github.com/NateRex/malmoext/blob/master/docs/agent_actions.md The actions performed by an agent may depend on the current game state. Documentation on what state details are observable can be found at https://github.com/NateRex/malmoext/blob/master/docs/scenario_state.md Example implementation: agent = getAgent() mob = agent.closest_hostile_mob() if mob != None: if agent.lookAt(mob) and agent.moveTo(mob): agent.attack() else: agent.doNothing() ''' pass
def run(self, ports=[10000]) ‑> NoneType
-
Executes this scenario within the Malmo Minecraft instances running on the given ports. By default, a single Malmo Minecraft instance running on port 10000 is assumed.
Documentation on how to run one or more Malmo Minecraft instances on different ports can be found at https://github.com/NateRex/malmoext/blob/master/README.md#running-a-scenario
Expand source code
def run(self, ports=[10000]) -> None: '''Executes this scenario within the Malmo Minecraft instances running on the given ports. By default, a single Malmo Minecraft instance running on port 10000 is assumed. Documentation on how to run one or more Malmo Minecraft instances on different ports can be found at https://github.com/NateRex/malmoext/blob/master/README.md#running-a-scenario ''' # Initialize Malmo Platform environment MalmoBootstrap.init_env() # Construct scenario self.build_scenario(self.__builder) # Validate scenario numAgents = len(self.__builder.agents) if (len(ports) < numAgents): raise Exception('Number of agents must not exceed the number of Malmo Minecraft instances currently running.') if (numAgents == 0): print('No agents present in scenario. Exiting.') exit(0) # Construct agents clientPool = MalmoPython.ClientPool() agentIdx = 0 agentZero = None for builder in self.__builder.agents.values(): agent = Agent(builder) self.__agents[agent.get_name()] = agent clientPool.add(MalmoPython.ClientInfo('127.0.0.1', ports[agentIdx])) if (agentZero is None): agentZero = agent agentIdx += 1 # Load the scenario mission = MalmoPython.MissionSpec(self.__builder.build(), True) parse_command_line(agentZero.get_host()) # Start the mission agentIdx = 0 for agent in self.__agents.values(): recordingObject = get_default_recording_object(agentZero.get_host(), "agent_{}_viewpoint_continuous".format(agentIdx + 1)) self.__start_host_mission(agent, mission, clientPool, recordingObject, agentIdx, '') agentIdx += 1 # Wait for mission to start self.__wait_for_mission_start() # While mission is running, repeatedly synchronize the local state with the remote server state, # and execute agent actions (assume the time limit is the same across all agents) while (agentZero.is_mission_active()): # Avoid handing off control while we are still waiting to receive observations for one or more agents if not self.__all_agents_have_observations(): continue # Sync agent states for agent in self.__agents.values(): agent._sync() # Call handler to perform agent actions self.on_tick(self.__agents) time.sleep(0.05) print('Mission has ended.')