Module malmoext.agent_state

Expand source code
from typing import Any, Union
from malmoext.types import Block, Mob, Item, Inventory, Vector, Rotation, Entity, InventoryItem
from malmoext.utils import Utils
from malmoext.agent import Agent
import json

class AgentState:
    '''An AgentState represents the observable world from the perspective of a single agent.
    It represents an alternative representation of the JSON data provided by Malmo.'''

    def __init__(self, agent: Agent):
        '''Constructor. Accepts the agent whose perspective this state represents.'''
        
        raw_state = agent.get_host().getWorldState()
        raw_data = json.loads(raw_state.observations[-1].text)

        self.__position = self.__parse_position(raw_data)
        self.__pov = self.__parse_pov_camera_angles(raw_data)
        self.__grid = self.__parse_grid(raw_data, agent.get_observable_distances())
        self.__nearby_entities = self.__parse_nearby_entities(raw_data)
        self.__inventory = self.__parse_inventory(raw_data)
        self.__equipped_slot = self.__parse_equipped_slot(raw_data)
        self.__recent_trade_positions = agent._get_recent_trade_positions()


    def get_position(self):
        '''Returns the current position of this agent'''
        return self.__position
    

    def get_pov(self):
        '''Returns the current camera angles for this agent's point-of-view (POV)'''
        return self.__pov


    def has_nearby_entity(self, aType: Union[Mob, Item]):
        '''Returns true if an entity with the given type exists within the agent's observable range.
        Returns false otherwise.'''

        return self.__get_entity_by_type(aType) is not None


    def get_nearby_entities(self):
        '''Returns a dictionary containing all entities nearby the agent, organized by type.'''
        
        return self.__nearby_entities
    
    
    def get_nearby_entities(self, aType: Union[Mob, Item]):
        '''Returns a list containing all nearby entities of the given type.'''
        
        if aType not in self.__nearby_entities:
            return []
        
        return self.__nearby_entities[aType]


    def get_nearby_entity(self, aType: Union[Mob, Item, str]):
        '''Returns the closest entity to the agent, specified either by name or by type. Ignores any items that
        have been recently given to another entity. Returns None if no entity could be found using the information
        provided.'''

        if isinstance(aType, str):
            return self.__get_entity_by_name(aType)
        else:
            return self.__get_entity_by_type(aType)

    
    def __get_entity_by_type(self, mob_type: Union[Mob, Item]):
        '''Returns the closest entity to the agent containing the given type. Ignores any items that
        have been recently given to another entity. Returns None if no entity with that
        type exists within the agent's observable range.'''

        if mob_type not in self.__nearby_entities:
            return None

        closest_sqrd_distance = None
        closest_entity = None
        for entity in self.__nearby_entities[mob_type]:
            if self.__is_item_near_recent_trade(entity):
                continue

            sqrd_distance = Utils.squared_distance(self.__position, entity.position)
            if (closest_entity is None) or (sqrd_distance < closest_sqrd_distance):
                closest_sqrd_distance = sqrd_distance
                closest_entity = entity
        
        return closest_entity
    

    def __get_entity_by_name(self, name: str):
        '''Returns the closest entity to the agent containing the given name. Ignores any items that
        have been recently given to another entity. Returns None if no entity with that name exists
        within the agent's observable range.'''

        closest_sqrd_distance = None
        closest_entity = None
        for eType in self.__nearby_entities:
            for entity in self.__nearby_entities[eType]:
                if self.__is_item_near_recent_trade(entity):
                    continue

                if entity.name == name:
                    sqrd_distance = Utils.squared_distance(self.__position, entity.position)
                    if (closest_entity is None) or (sqrd_distance < closest_sqrd_distance):
                        closest_sqrd_distance = sqrd_distance
                        closest_entity = entity
        
        return closest_entity


    def __is_item_near_recent_trade(self, entity: Entity):
        '''Returns true if the given entity is a drop item that exists nearby a position where the agent
        recently performed a trade. Returns false otherwise.'''

        if not Item.contains(entity.type):
            return False

        for pos in self.__recent_trade_positions:
            distance = Utils.distance(entity.position, pos)
            if distance < Agent.TRADE_IGNORE_DISTANCE:
                return True

        return False


    def get_nearby_block(self, rel_pos: Vector):
        '''Returns the type of block present at a location, defined in coordinates relative to the agent.
        
        For example:
        
            get_block(Vector(-1, 0, 0))
        
        would return the type one block away in the negative x direction.
        
        An exception will be thrown if the caller attempts to access a block outside the obserable range
        of the agent.'''

        return self.__grid[rel_pos]


    def has_inventory_item(self, item_type: Item):
        '''Returns true if the given item exists in the agent's inventory. Returns false otherwise.'''

        return item_type in self.__inventory


    def get_inventory_item(self, item_type: Item):
        '''Searches the agent inventory for an item of the given type. This method searches the agent's hotbar,
        main inventory, and armor slots, in that order and returns the first instance found. Returns None if the
        agent does not have that item.'''

        if (not item_type in self.__inventory):
            return None
        
        instances = self.__inventory[item_type]
        preferred_instance = None
        for instance in instances:
            if (preferred_instance == None) or (instance.slot.value < preferred_instance.slot.value):
                preferred_instance = instance
        return preferred_instance


    def get_currently_equipped_slot(self):
        '''Returns the inventory hotbar slot currently equipped by the agent.'''

        return self.__equipped_slot


    def get_available_hotbar_slot(self):
        '''Returns the next available hotbar slot for this agent that does not currently contain an item. Returns
        None if all hotbar slots are currently occupied.'''
        
        in_use = set()
        for instances in self.__inventory.values():
            for invItem in instances:
                in_use.add(invItem.slot)
        
        for slot in Inventory.HotBar:
            if slot not in in_use:
                return slot
            
        return None


    def __parse_position(self, raw_data):
        '''Parses a raw observation object to determine the current position of the agent.'''
        
        return Vector(raw_data['XPos'], raw_data['YPos'], raw_data['ZPos'])
    

    def __parse_pov_camera_angles(self, raw_data):
        '''Parses a raw observation object to determine the current camera angles of the agent'''

        # Ensure we normalize yaw to the range (0, 360)
        yaw = raw_data['Yaw']
        yaw = (yaw + 360) % 360
        
        return Rotation(yaw, raw_data['Pitch'])


    def __parse_nearby_entities(self, raw_data):
        '''Parses a raw observation object to determine all entities near the agent. An entity is defined as a mob,
        a drop item, or another agent.
        
        Returns a dictionary containing all nearby entities to the agent, organized by type.'''
        
        entities = {}     # type: dict[Union[Mob, Item], list[Entity]]
        for obj in raw_data['nearby_entities']:

            if Item.contains(obj['name']):
                eType = Item(obj['name'])
            elif Mob.contains(obj['name']):
                eType = Mob(obj['name'])
            else:
                # Assume entity is agent
                eType = Mob.agent

            ePos = Vector(obj['x'], obj['y'], obj['z'])
            entity = Entity(obj['id'], eType, obj['name'], ePos, obj.get('quantity', 1))
            Utils.add_or_append(entities, eType, entity)
        
        return entities


    def __parse_grid(self, raw_data: Any, observable_distances: Vector):
        '''Parses a raw observation object to determine the 3-dimensional grid of blocks surrounding the
        agent. The resulting grid is indexed using block locations relative to the agent.'''
        
        raw_grid = raw_data['blockgrid']

        expected_size = (observable_distances.x * 2 + 1) * (observable_distances.y * 2 + 1) * (observable_distances.z * 2 + 1)
        actual_size = len(raw_grid)
        if (expected_size != actual_size):
            raise Exception('Block grid received from server did not match expected observation size')

        idx = 0
        grid = {}      # type: dict[Vector, Block]
        for x in range(-observable_distances.x, observable_distances.x):
            for z in range(-observable_distances.z, observable_distances.z):
                for y in range(-observable_distances.y, observable_distances.y):
                    grid[Vector(x, y, z)] = Block(raw_grid[idx])
                    idx += 1
        return grid
    

    def __parse_inventory(self, raw_data: Any):
        '''Parses a raw observation object to determine the current inventory of an agent.
        
        The resulting dictionary contains all items in the agent's inventory, organized by type.'''

        raw_inventory = raw_data['inventory']

        inventory = {}     # type: dict[Item, list[InventoryItem]]
        for obj in raw_inventory:
            iType = Item(obj['type'])
            index = obj['index']

            # Determine inventory slot
            if Inventory.HotBar.contains(index):
                slot = Inventory.HotBar(index)
            elif Inventory.Main.contains(index):
                slot = Inventory.Main(index)
            elif Inventory.Armor.contains(index):
                slot = Inventory.Armor(index)

            inventoryItem = InventoryItem(iType, obj['quantity'], slot)
            Utils.add_or_append(inventory, iType, inventoryItem)

        return inventory


    def __parse_equipped_slot(self, raw_data: Any):
        '''Parses a raw observation object to determine the currently equipped inventory slot of the agent.'''

        index = raw_data['currentItemIndex']

        if Inventory.HotBar.contains(index):
            return Inventory.HotBar(index)
        
        elif Inventory.Main.contains(index):
            return Inventory.Main(index)
        
        else:
            return Inventory.Armor(index)

Classes

class AgentState (agent: Agent)

An AgentState represents the observable world from the perspective of a single agent. It represents an alternative representation of the JSON data provided by Malmo.

Constructor. Accepts the agent whose perspective this state represents.

Expand source code
class AgentState:
    '''An AgentState represents the observable world from the perspective of a single agent.
    It represents an alternative representation of the JSON data provided by Malmo.'''

    def __init__(self, agent: Agent):
        '''Constructor. Accepts the agent whose perspective this state represents.'''
        
        raw_state = agent.get_host().getWorldState()
        raw_data = json.loads(raw_state.observations[-1].text)

        self.__position = self.__parse_position(raw_data)
        self.__pov = self.__parse_pov_camera_angles(raw_data)
        self.__grid = self.__parse_grid(raw_data, agent.get_observable_distances())
        self.__nearby_entities = self.__parse_nearby_entities(raw_data)
        self.__inventory = self.__parse_inventory(raw_data)
        self.__equipped_slot = self.__parse_equipped_slot(raw_data)
        self.__recent_trade_positions = agent._get_recent_trade_positions()


    def get_position(self):
        '''Returns the current position of this agent'''
        return self.__position
    

    def get_pov(self):
        '''Returns the current camera angles for this agent's point-of-view (POV)'''
        return self.__pov


    def has_nearby_entity(self, aType: Union[Mob, Item]):
        '''Returns true if an entity with the given type exists within the agent's observable range.
        Returns false otherwise.'''

        return self.__get_entity_by_type(aType) is not None


    def get_nearby_entities(self):
        '''Returns a dictionary containing all entities nearby the agent, organized by type.'''
        
        return self.__nearby_entities
    
    
    def get_nearby_entities(self, aType: Union[Mob, Item]):
        '''Returns a list containing all nearby entities of the given type.'''
        
        if aType not in self.__nearby_entities:
            return []
        
        return self.__nearby_entities[aType]


    def get_nearby_entity(self, aType: Union[Mob, Item, str]):
        '''Returns the closest entity to the agent, specified either by name or by type. Ignores any items that
        have been recently given to another entity. Returns None if no entity could be found using the information
        provided.'''

        if isinstance(aType, str):
            return self.__get_entity_by_name(aType)
        else:
            return self.__get_entity_by_type(aType)

    
    def __get_entity_by_type(self, mob_type: Union[Mob, Item]):
        '''Returns the closest entity to the agent containing the given type. Ignores any items that
        have been recently given to another entity. Returns None if no entity with that
        type exists within the agent's observable range.'''

        if mob_type not in self.__nearby_entities:
            return None

        closest_sqrd_distance = None
        closest_entity = None
        for entity in self.__nearby_entities[mob_type]:
            if self.__is_item_near_recent_trade(entity):
                continue

            sqrd_distance = Utils.squared_distance(self.__position, entity.position)
            if (closest_entity is None) or (sqrd_distance < closest_sqrd_distance):
                closest_sqrd_distance = sqrd_distance
                closest_entity = entity
        
        return closest_entity
    

    def __get_entity_by_name(self, name: str):
        '''Returns the closest entity to the agent containing the given name. Ignores any items that
        have been recently given to another entity. Returns None if no entity with that name exists
        within the agent's observable range.'''

        closest_sqrd_distance = None
        closest_entity = None
        for eType in self.__nearby_entities:
            for entity in self.__nearby_entities[eType]:
                if self.__is_item_near_recent_trade(entity):
                    continue

                if entity.name == name:
                    sqrd_distance = Utils.squared_distance(self.__position, entity.position)
                    if (closest_entity is None) or (sqrd_distance < closest_sqrd_distance):
                        closest_sqrd_distance = sqrd_distance
                        closest_entity = entity
        
        return closest_entity


    def __is_item_near_recent_trade(self, entity: Entity):
        '''Returns true if the given entity is a drop item that exists nearby a position where the agent
        recently performed a trade. Returns false otherwise.'''

        if not Item.contains(entity.type):
            return False

        for pos in self.__recent_trade_positions:
            distance = Utils.distance(entity.position, pos)
            if distance < Agent.TRADE_IGNORE_DISTANCE:
                return True

        return False


    def get_nearby_block(self, rel_pos: Vector):
        '''Returns the type of block present at a location, defined in coordinates relative to the agent.
        
        For example:
        
            get_block(Vector(-1, 0, 0))
        
        would return the type one block away in the negative x direction.
        
        An exception will be thrown if the caller attempts to access a block outside the obserable range
        of the agent.'''

        return self.__grid[rel_pos]


    def has_inventory_item(self, item_type: Item):
        '''Returns true if the given item exists in the agent's inventory. Returns false otherwise.'''

        return item_type in self.__inventory


    def get_inventory_item(self, item_type: Item):
        '''Searches the agent inventory for an item of the given type. This method searches the agent's hotbar,
        main inventory, and armor slots, in that order and returns the first instance found. Returns None if the
        agent does not have that item.'''

        if (not item_type in self.__inventory):
            return None
        
        instances = self.__inventory[item_type]
        preferred_instance = None
        for instance in instances:
            if (preferred_instance == None) or (instance.slot.value < preferred_instance.slot.value):
                preferred_instance = instance
        return preferred_instance


    def get_currently_equipped_slot(self):
        '''Returns the inventory hotbar slot currently equipped by the agent.'''

        return self.__equipped_slot


    def get_available_hotbar_slot(self):
        '''Returns the next available hotbar slot for this agent that does not currently contain an item. Returns
        None if all hotbar slots are currently occupied.'''
        
        in_use = set()
        for instances in self.__inventory.values():
            for invItem in instances:
                in_use.add(invItem.slot)
        
        for slot in Inventory.HotBar:
            if slot not in in_use:
                return slot
            
        return None


    def __parse_position(self, raw_data):
        '''Parses a raw observation object to determine the current position of the agent.'''
        
        return Vector(raw_data['XPos'], raw_data['YPos'], raw_data['ZPos'])
    

    def __parse_pov_camera_angles(self, raw_data):
        '''Parses a raw observation object to determine the current camera angles of the agent'''

        # Ensure we normalize yaw to the range (0, 360)
        yaw = raw_data['Yaw']
        yaw = (yaw + 360) % 360
        
        return Rotation(yaw, raw_data['Pitch'])


    def __parse_nearby_entities(self, raw_data):
        '''Parses a raw observation object to determine all entities near the agent. An entity is defined as a mob,
        a drop item, or another agent.
        
        Returns a dictionary containing all nearby entities to the agent, organized by type.'''
        
        entities = {}     # type: dict[Union[Mob, Item], list[Entity]]
        for obj in raw_data['nearby_entities']:

            if Item.contains(obj['name']):
                eType = Item(obj['name'])
            elif Mob.contains(obj['name']):
                eType = Mob(obj['name'])
            else:
                # Assume entity is agent
                eType = Mob.agent

            ePos = Vector(obj['x'], obj['y'], obj['z'])
            entity = Entity(obj['id'], eType, obj['name'], ePos, obj.get('quantity', 1))
            Utils.add_or_append(entities, eType, entity)
        
        return entities


    def __parse_grid(self, raw_data: Any, observable_distances: Vector):
        '''Parses a raw observation object to determine the 3-dimensional grid of blocks surrounding the
        agent. The resulting grid is indexed using block locations relative to the agent.'''
        
        raw_grid = raw_data['blockgrid']

        expected_size = (observable_distances.x * 2 + 1) * (observable_distances.y * 2 + 1) * (observable_distances.z * 2 + 1)
        actual_size = len(raw_grid)
        if (expected_size != actual_size):
            raise Exception('Block grid received from server did not match expected observation size')

        idx = 0
        grid = {}      # type: dict[Vector, Block]
        for x in range(-observable_distances.x, observable_distances.x):
            for z in range(-observable_distances.z, observable_distances.z):
                for y in range(-observable_distances.y, observable_distances.y):
                    grid[Vector(x, y, z)] = Block(raw_grid[idx])
                    idx += 1
        return grid
    

    def __parse_inventory(self, raw_data: Any):
        '''Parses a raw observation object to determine the current inventory of an agent.
        
        The resulting dictionary contains all items in the agent's inventory, organized by type.'''

        raw_inventory = raw_data['inventory']

        inventory = {}     # type: dict[Item, list[InventoryItem]]
        for obj in raw_inventory:
            iType = Item(obj['type'])
            index = obj['index']

            # Determine inventory slot
            if Inventory.HotBar.contains(index):
                slot = Inventory.HotBar(index)
            elif Inventory.Main.contains(index):
                slot = Inventory.Main(index)
            elif Inventory.Armor.contains(index):
                slot = Inventory.Armor(index)

            inventoryItem = InventoryItem(iType, obj['quantity'], slot)
            Utils.add_or_append(inventory, iType, inventoryItem)

        return inventory


    def __parse_equipped_slot(self, raw_data: Any):
        '''Parses a raw observation object to determine the currently equipped inventory slot of the agent.'''

        index = raw_data['currentItemIndex']

        if Inventory.HotBar.contains(index):
            return Inventory.HotBar(index)
        
        elif Inventory.Main.contains(index):
            return Inventory.Main(index)
        
        else:
            return Inventory.Armor(index)

Methods

def get_available_hotbar_slot(self)

Returns the next available hotbar slot for this agent that does not currently contain an item. Returns None if all hotbar slots are currently occupied.

Expand source code
def get_available_hotbar_slot(self):
    '''Returns the next available hotbar slot for this agent that does not currently contain an item. Returns
    None if all hotbar slots are currently occupied.'''
    
    in_use = set()
    for instances in self.__inventory.values():
        for invItem in instances:
            in_use.add(invItem.slot)
    
    for slot in Inventory.HotBar:
        if slot not in in_use:
            return slot
        
    return None
def get_currently_equipped_slot(self)

Returns the inventory hotbar slot currently equipped by the agent.

Expand source code
def get_currently_equipped_slot(self):
    '''Returns the inventory hotbar slot currently equipped by the agent.'''

    return self.__equipped_slot
def get_inventory_item(self, item_type: Item)

Searches the agent inventory for an item of the given type. This method searches the agent's hotbar, main inventory, and armor slots, in that order and returns the first instance found. Returns None if the agent does not have that item.

Expand source code
def get_inventory_item(self, item_type: Item):
    '''Searches the agent inventory for an item of the given type. This method searches the agent's hotbar,
    main inventory, and armor slots, in that order and returns the first instance found. Returns None if the
    agent does not have that item.'''

    if (not item_type in self.__inventory):
        return None
    
    instances = self.__inventory[item_type]
    preferred_instance = None
    for instance in instances:
        if (preferred_instance == None) or (instance.slot.value < preferred_instance.slot.value):
            preferred_instance = instance
    return preferred_instance
def get_nearby_block(self, rel_pos: Vector)

Returns the type of block present at a location, defined in coordinates relative to the agent.

For example:

get_block(Vector(-1, 0, 0))

would return the type one block away in the negative x direction.

An exception will be thrown if the caller attempts to access a block outside the obserable range of the agent.

Expand source code
def get_nearby_block(self, rel_pos: Vector):
    '''Returns the type of block present at a location, defined in coordinates relative to the agent.
    
    For example:
    
        get_block(Vector(-1, 0, 0))
    
    would return the type one block away in the negative x direction.
    
    An exception will be thrown if the caller attempts to access a block outside the obserable range
    of the agent.'''

    return self.__grid[rel_pos]
def get_nearby_entities(self, aType: Union[MobItem])

Returns a list containing all nearby entities of the given type.

Expand source code
def get_nearby_entities(self, aType: Union[Mob, Item]):
    '''Returns a list containing all nearby entities of the given type.'''
    
    if aType not in self.__nearby_entities:
        return []
    
    return self.__nearby_entities[aType]
def get_nearby_entity(self, aType: Union[MobItem, str])

Returns the closest entity to the agent, specified either by name or by type. Ignores any items that have been recently given to another entity. Returns None if no entity could be found using the information provided.

Expand source code
def get_nearby_entity(self, aType: Union[Mob, Item, str]):
    '''Returns the closest entity to the agent, specified either by name or by type. Ignores any items that
    have been recently given to another entity. Returns None if no entity could be found using the information
    provided.'''

    if isinstance(aType, str):
        return self.__get_entity_by_name(aType)
    else:
        return self.__get_entity_by_type(aType)
def get_position(self)

Returns the current position of this agent

Expand source code
def get_position(self):
    '''Returns the current position of this agent'''
    return self.__position
def get_pov(self)

Returns the current camera angles for this agent's point-of-view (POV)

Expand source code
def get_pov(self):
    '''Returns the current camera angles for this agent's point-of-view (POV)'''
    return self.__pov
def has_inventory_item(self, item_type: Item)

Returns true if the given item exists in the agent's inventory. Returns false otherwise.

Expand source code
def has_inventory_item(self, item_type: Item):
    '''Returns true if the given item exists in the agent's inventory. Returns false otherwise.'''

    return item_type in self.__inventory
def has_nearby_entity(self, aType: Union[MobItem])

Returns true if an entity with the given type exists within the agent's observable range. Returns false otherwise.

Expand source code
def has_nearby_entity(self, aType: Union[Mob, Item]):
    '''Returns true if an entity with the given type exists within the agent's observable range.
    Returns false otherwise.'''

    return self.__get_entity_by_type(aType) is not None