from typing import List
from typing import Optional
from pams.order import OrderKind
[docs]class Log:
"""Log class."""
[docs] def read_and_write(self, logger: "Logger") -> None:
"""writing a log. (The actual writing timing depends on the logger.)
Args:
logger (:class:`pams.logs.Logger`): logger.
Returns:
None
"""
logger.write(log=self)
[docs] def read_and_write_with_direct_process(self, logger: "Logger") -> None:
"""direct writing a log. (This method is intended to call logger to write the log immediately.)
Args:
logger (:class:`pams.logs.Logger`): logger.
Returns:
None
"""
logger.write_and_direct_process(log=self)
[docs]class OrderLog(Log):
"""Order log class.
This log is usually generated when an order is accepted by markets.
"""
def __init__(
self,
order_id: int,
market_id: int,
time: int,
agent_id: int,
is_buy: bool,
kind: OrderKind,
volume: int,
price: Optional[float] = None,
ttl: Optional[int] = None,
):
"""initialize.
Args:
order_id (int): order ID.
market_id (int): market ID.
time (int): time.
agent_id (int): agent ID.
is_buy (bool): whether it is a buy order or not.
kind (:class:`pams.order.OrderKind`): kind of order.
volume (int): order volume.
price (float, Optional): order price.
ttl (int, Optional): time to order expiration.
"""
self.order_id: int = order_id
self.market_id: int = market_id
self.time: int = time
self.agent_id: int = agent_id
self.is_buy: bool = is_buy
self.kind: OrderKind = kind
self.price: Optional[float] = price
self.volume: int = volume
self.ttl: Optional[int] = ttl
# TODO: Type validation
[docs]class CancelLog(Log):
"""Cancel type log class.
This log is usually generated when a cancel order is accepted by markets.
"""
# ToDo: check ttl is necessary or not
def __init__(
self,
order_id: int,
market_id: int,
cancel_time: int,
order_time: int,
agent_id: int,
is_buy: bool,
kind: OrderKind,
volume: int,
price: Optional[float] = None,
ttl: Optional[int] = None,
):
"""initialize.
Args:
order_id (int): order ID.
market_id (int): market ID.
cancel_time (int): time to cancel.
order_time (int): time to order.
agent_id (int): agent ID.
is_buy (bool): whether it is a buy order or not.
kind (:class:`pams.order.OrderKind`): kind of order.
volume (int): order volume.
price (float, Optional): order price.
ttl (int, Optional): time to cancel expiration.
"""
self.order_id: int = order_id
self.market_id: int = market_id
self.cancel_time: int = cancel_time
self.order_time: int = order_time
self.agent_id: int = agent_id
self.is_buy: bool = is_buy
self.kind: OrderKind = kind
self.price: Optional[float] = price
self.volume: int = volume
self.ttl: Optional[int] = ttl
# TODO: Type validation
[docs]class ExecutionLog(Log):
"""Execution type log class.
This log is usually generated when an order is executed on markets.
"""
def __init__(
self,
market_id: int,
time: int,
buy_agent_id: int,
sell_agent_id: int,
buy_order_id: int,
sell_order_id: int,
price: float,
volume: int,
):
"""initialize.
Args:
market_id (int): market ID.
time (int): time to execute.
buy_agent_id (int): buyer agent ID.
sell_agent_id (int): seller agent ID.
buy_order_id (int): buy order ID.
sell_order_id (int): sell order ID.
price (float): executed price.
volume (int): executed volume.
"""
self.market_id: int = market_id
self.time: int = time
self.buy_agent_id: int = buy_agent_id
self.sell_agent_id: int = sell_agent_id
self.buy_order_id: int = buy_order_id
self.sell_order_id: int = sell_order_id
self.price: float = price
self.volume: int = volume
# TODO: Type validation
[docs]class SimulationBeginLog(Log):
"""Simulation beginning log class."""
def __init__(self, simulator: "Simulator"): # type: ignore # NOQA
"""initialize.
Args:
simulator (:class:`pams.Simulator`): simulator.
"""
self.simulator = simulator
[docs]class SimulationEndLog(Log):
"""Simulation ending log class."""
def __init__(self, simulator: "Simulator"): # type: ignore # NOQA
"""initialize.
Args:
simulator (:class:`pams.Simulator`): simulator.
"""
self.simulator = simulator
[docs]class SessionBeginLog(Log):
"""Session beginning log class."""
def __init__(self, session: "Session", simulator: "Simulator"): # type: ignore # NOQA
"""initialize.
Args:
session (:class:`pams.Session`): session.
simulator (:class:`pams.Simulator`): simulator.
"""
self.simulator = simulator
self.session = session
[docs]class SessionEndLog(Log):
"""Session ending log class."""
def __init__(self, session: "Session", simulator: "Simulator"): # type: ignore # NOQA
"""initialize.
Args:
session (:class:`pams.Session`): session.
simulator (:class:`pams.Simulator`): simulator.
"""
self.simulator = simulator
self.session = session
[docs]class MarketStepBeginLog(Log):
"""Market step beginning log class."""
def __init__(self, session: "Session", market: "Market", simulator: "Simulator"): # type: ignore # NOQA
"""initialize.
Args:
session (:class:`pams.Session`): session.
market (:class:`pams.Market`): market.
simulator (:class:`pams.Simulator`): simulator.
"""
self.session = session
self.market = market
self.simulator = simulator
[docs]class MarketStepEndLog(Log):
"""Market step ending log class."""
def __init__(self, session: "Session", market: "Market", simulator: "Simulator"): # type: ignore # NOQA
"""initialize.
Args:
session (:class:`pams.Session`): session.
market (:class:`pams.Market`): market.
simulator (:class:`pams.Simulator`): simulator.
"""
self.session = session
self.market = market
self.simulator = simulator
[docs]class Logger:
"""Logger class.
Logger is designed to handling parallelized market simulations.
In the parallelized simulation, the order that logs are pushed to this logger is not deterministic
because of varied computational time.
For example, there are 2 markets, market 1 and market 2, and those markets are run in parallel,
the computational time of one step for each market are not fixed.
Therefore, the logging sequence of market 1 and market 2 is not always the same.
For handling this problem, logger should save the logs from those markets and the end of each step,
the logs are processed and written.
However, for some implementation, non-blocking log writing should be also supported.
Therefore, this logger has 2 methods to handling logs -- :func:`write` and :func:`write_and_direct_process`
"""
simulator: "Simulator" # type: ignore # NOQA
def __init__(self) -> None:
"""initialize logger."""
self.pending_logs: List[Log] = []
def _set_simulator(self, simulator: "Simulator") -> None: # type: ignore # NOQA
"""set simulator.
Args:
simulator (:class:`pams.Simulator`): simulator.
Returns:
None
"""
self.simulator = simulator
[docs] def write(self, log: "Log") -> None:
"""set a log to pending list.
Args:
log (:class:`pams.logs.Log`): log.
Returns:
None
"""
self.pending_logs.append(log)
[docs] def bulk_write(self, logs: List["Log"]) -> None:
"""set some logs to pending list.
Args:
logs (List[:class:`pams.logs.Log`]): log list.
Returns:
None
"""
self.pending_logs.extend(logs)
[docs] def write_and_direct_process(self, log: "Log") -> None:
"""direct writing a log.
Args:
log (:class:`pams.logs.Log`): log.
Returns:
None
"""
self.process(logs=[log])
[docs] def bulk_write_and_direct_process(self, logs: List["Log"]) -> None:
"""direct writing some logs.
Args:
logs (List[:class:`pams.logs.Log`]): log list.
Returns:
None
"""
self.process(logs=logs)
def _process(self) -> None:
self.process(logs=self.pending_logs)
self.pending_logs = []
[docs] def process(self, logs: List["Log"]) -> None:
"""logging execution. For each log type, each processing method are implemented.
For usual implementation, please use
- :func:`process_order_log`
- :func:`process_cancel_log`
- :func:`process_execution_log`
- :func:`process_simulation_begin_log`
- :func:`process_simulation_end_log`
- :func:`process_session_begin_log`
- :func:`process_session_end_log`
- :func:`process_market_step_begin_log`
- :func:`process_market_step_end_log`
However, if you want to control the log writing sequence, you can change this implementation.
Args:
logs (List[:class:`pams.logs.Log`]): log list.
Returns:
None
"""
for log in logs:
if isinstance(log, OrderLog):
self.process_order_log(log=log)
elif isinstance(log, CancelLog):
self.process_cancel_log(log=log)
elif isinstance(log, ExecutionLog):
self.process_execution_log(log=log)
elif isinstance(log, SimulationBeginLog):
self.process_simulation_begin_log(log=log)
elif isinstance(log, SimulationEndLog):
self.process_simulation_end_log(log=log)
elif isinstance(log, SessionBeginLog):
self.process_session_begin_log(log=log)
elif isinstance(log, SessionEndLog):
self.process_session_end_log(log=log)
elif isinstance(log, MarketStepBeginLog):
self.process_market_step_begin_log(log=log)
elif isinstance(log, MarketStepEndLog):
self.process_market_step_end_log(log=log)
else:
raise NotImplementedError
[docs] def process_order_log(self, log: "OrderLog") -> None:
"""process order log. Called from :func:`process`.
Args:
log (:class:`pams.logs.OrderLog`]): order log
Returns:
None"""
pass
[docs] def process_cancel_log(self, log: "CancelLog") -> None:
"""process cancel log. Called from :func:`process`.
Args:
log (:class:`pams.logs.CancelLog`]): cancel log
Returns:
None
"""
pass
[docs] def process_execution_log(self, log: "ExecutionLog") -> None:
"""process execution log. Called from :func:`process`.
Args:
log (:class:`pams.logs.ExecutionLog`]): execution log
Returns:
None
"""
pass
[docs] def process_simulation_begin_log(self, log: "SimulationBeginLog") -> None:
"""process simulation begin log. Called from :func:`process`.
Args:
log (:class:`pams.logs.SimulationBeginLog`]): simulation begin log
Returns:
None
"""
pass
[docs] def process_simulation_end_log(self, log: "SimulationEndLog") -> None:
"""process simulation end log. Called from :func:`process`.
Args:
log (:class:`pams.logs.SimulationEndLog`]): simulation end log
Returns:
None
"""
pass
[docs] def process_session_begin_log(self, log: "SessionBeginLog") -> None:
"""process session begin log. Called from :func:`process`.
Args:
log (:class:`pams.logs.SessionBeginLog`]): session begin log
Returns:
None
"""
pass
[docs] def process_session_end_log(self, log: "SessionEndLog") -> None:
"""process session end log. Called from :func:`process`.
Args:
log (:class:`pams.logs.SessionEndLog`]): session end log
Returns:
None
"""
pass
[docs] def process_market_step_begin_log(self, log: "MarketStepBeginLog") -> None:
"""process market step begin log. Called from :func:`process`.
Args:
log (:class:`pams.logs.MarketStepBeginLog`]): market step begin log
Returns:
None
"""
pass
[docs] def process_market_step_end_log(self, log: "MarketStepEndLog") -> None:
"""process market step end log. Called from :func:`process`.
Args:
log (:class:`pams.logs.MarketStepEndLog`]): market step end log
Returns:
None
"""
pass