// High-Performance C++ Execution Engine for ES Futures Scalping
// Optimized for microsecond latency with lock-free data structures
#pragma once
#include <chrono>
#include <atomic>
#include <memory>
#include <array>
#include <cstring>
#include <thread>
#include <immintrin.h> // For SIMD operations
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/mman.h> // For memory mapping
#include <unistd.h>
#include <fcntl.h>
// =============================================================================
// ULTRA LOW-LATENCY TYPES AND CONSTANTS
// =============================================================================
using HighResClock = std::chrono::high_resolution_clock;
using TimePoint = HighResClock::time_point;
using Nanoseconds = std::chrono::nanoseconds;
constexpr size_t CACHE_LINE_SIZE = 64;
constexpr size_t MAX_ORDERS = 1024;
constexpr size_t RING_BUFFER_SIZE = 8192; // Must be power of 2
constexpr int32_t ES_TICK_SIZE = 25; // 0.25 points = 25 ticks
constexpr int32_t ES_POINT_VALUE = 50; // $50 per point
// Price representation in ticks (avoids floating point)
using Price = int64_t; // Price in ticks (e.g., 4500.25 = 450025)
using Quantity = int32_t;
using OrderId = uint64_t;
// =============================================================================
// LOCK-FREE RING BUFFER FOR ULTRA-LOW LATENCY
// =============================================================================
template<typename T, size_t Size>
class alignas(CACHE_LINE_SIZE) LockFreeRingBuffer {
static_assert((Size & (Size - 1)) == 0, "Size must be power of 2");
private:
alignas(CACHE_LINE_SIZE) std::atomic<size_t> head_{0};
alignas(CACHE_LINE_SIZE) std::atomic<size_t> tail_{0};
alignas(CACHE_LINE_SIZE) std::array<T, Size> buffer_;
public:
bool try_push(const T& item) noexcept {
const size_t current_tail = tail_.load(std::memory_order_relaxed);
const size_t next_tail = (current_tail + 1) & (Size - 1);
if (next_tail == head_.load(std::memory_order_acquire)) {
return false; // Buffer full
}
buffer_[current_tail] = item;
tail_.store(next_tail, std::memory_order_release);
return true;
}
bool try_pop(T& item) noexcept {
const size_t current_head = head_.load(std::memory_order_relaxed);
if (current_head == tail_.load(std::memory_order_acquire)) {
return false; // Buffer empty
}
item = buffer_[current_head];
head_.store((current_head + 1) & (Size - 1), std::memory_order_release);
return true;
}
size_t size() const noexcept {
return (tail_.load(std::memory_order_acquire) -
head_.load(std::memory_order_acquire)) & (Size - 1);
}
};
// =============================================================================
// MARKET DATA STRUCTURES
// =============================================================================
struct alignas(CACHE_LINE_SIZE) MarketTick {
TimePoint timestamp;
Price bid_price;
Price ask_price;
Quantity bid_size;
Quantity ask_size;
Quantity last_traded_qty;
Price last_traded_price;
uint64_t sequence_number;
MarketTick() = default;
MarketTick(Price bid, Price ask, Quantity bid_sz, Quantity ask_sz)
: timestamp(HighResClock::now())
, bid_price(bid)
, ask_price(ask)
, bid_size(bid_sz)
, ask_size(ask_sz)
, last_traded_qty(0)
, last_traded_price(0)
, sequence_number(0) {}
inline Price mid_price() const noexcept {
return (bid_price + ask_price) / 2;
}
inline Price spread() const noexcept {
return ask_price - bid_price;
}
};
struct alignas(CACHE_LINE_SIZE) OrderRequest {
enum class Type : uint8_t { BUY, SELL, CANCEL, MODIFY };
enum class TimeInForce : uint8_t { IOC, FOK, GTC };
OrderId order_id;
Type type;
TimeInForce tif;
Price price;
Quantity quantity;
TimePoint timestamp;
uint64_t strategy_id;
OrderRequest() = default;
OrderRequest(Type t, Price p, Quantity q, uint64_t strat_id = 0)
: order_id(generate_order_id())
, type(t)
, tif(TimeInForce::IOC)
, price(p)
, quantity(q)
, timestamp(HighResClock::now())
, strategy_id(strat_id) {}
private:
static OrderId generate_order_id() {
static std::atomic<OrderId> counter{1};
return counter.fetch_add(1, std::memory_order_relaxed);
}
};
struct alignas(CACHE_LINE_SIZE) ExecutionReport {
enum class Status : uint8_t {
NEW, PARTIALLY_FILLED, FILLED, CANCELLED, REJECTED
};
OrderId order_id;
Status status;
Price fill_price;
Quantity fill_quantity;
Quantity remaining_quantity;
TimePoint timestamp;
char reject_reason[32];
ExecutionReport() = default;
ExecutionReport(OrderId id, Status st, Price price = 0, Quantity qty = 0)
: order_id(id)
, status(st)
, fill_price(price)
, fill_quantity(qty)
, remaining_quantity(0)
, timestamp(HighResClock::now()) {
std::memset(reject_reason, 0, sizeof(reject_reason));
}
};
// =============================================================================
// HIGH-PERFORMANCE NETWORK HANDLER
// =============================================================================
class UltraLowLatencySocket {
private:
int socket_fd_;
bool is_connected_;
public:
UltraLowLatencySocket() : socket_fd_(-1), is_connected_(false) {}
~UltraLowLatencySocket() {
if (socket_fd_ >= 0) {
close(socket_fd_);
}
}
bool initialize_tcp_socket(const char* host, int port) {
socket_fd_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd_ < 0) return false;
// Optimize socket for low latency
int flag = 1;
setsockopt(socket_fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
// Set socket to non-blocking
int flags = fcntl(socket_fd_, F_GETFL, 0);
fcntl(socket_fd_, F_SETFL, flags | O_NONBLOCK);
// Set socket buffer sizes
int buffer_size = 1024 * 1024; // 1MB
setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &buffer_size,
sizeof(buffer_size));
setsockopt(socket_fd_, SOL_SOCKET, SO_SNDBUF, &buffer_size,
sizeof(buffer_size));
// Connect to exchange
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
inet_pton(AF_INET, host, &addr.sin_addr);
if (connect(socket_fd_, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
is_connected_ = true;
return true;
}
return false;
}
bool initialize_udp_multicast(const char* multicast_group, int port) {
socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (socket_fd_ < 0) return false;
// Enable address reuse
int reuse = 1;
setsockopt(socket_fd_, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
// Bind to multicast address
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(socket_fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
return false;
}
// Join multicast group
struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(multicast_group);
mreq.imr_interface.s_addr = INADDR_ANY;
if (setsockopt(socket_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
return false;
}
is_connected_ = true;
return true;
}
ssize_t send_data(const void* data, size_t size) {
if (!is_connected_) return -1;
return send(socket_fd_, data, size, MSG_DONTWAIT);
}
ssize_t receive_data(void* buffer, size_t buffer_size) {
if (!is_connected_) return -1;
return recv(socket_fd_, buffer, buffer_size, MSG_DONTWAIT);
}
int get_fd() const { return socket_fd_; }
bool is_connected() const { return is_connected_; }
};
// =============================================================================
// MARKET DATA FEED HANDLER
// =============================================================================
class MarketDataFeedHandler {
private:
UltraLowLatencySocket market_data_socket_;
LockFreeRingBuffer<MarketTick, RING_BUFFER_SIZE> tick_buffer_;
std::atomic<bool> running_{false};
std::thread processing_thread_;
// Latest market state (lock-free access)
alignas(CACHE_LINE_SIZE) std::atomic<Price> best_bid_{0};
alignas(CACHE_LINE_SIZE) std::atomic<Price> best_ask_{0};
alignas(CACHE_LINE_SIZE) std::atomic<Quantity> bid_size_{0};
alignas(CACHE_LINE_SIZE) std::atomic<Quantity> ask_size_{0};
alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> last_update_ns_{0};
public:
MarketDataFeedHandler() = default;
~MarketDataFeedHandler() {
stop();
}
bool start_feed(const char* multicast_group, int port) {
if (!market_data_socket_.initialize_udp_multicast(multicast_group, port)) {
return false;
}
running_.store(true, std::memory_order_release);
processing_thread_ =
std::thread(&MarketDataFeedHandler::process_market_data, this);
return true;
}
void stop() {
running_.store(false, std::memory_order_release);
if (processing_thread_.joinable()) {
processing_thread_.join();
}
}
// Ultra-fast market state access (lock-free)
inline Price get_best_bid() const noexcept {
return best_bid_.load(std::memory_order_acquire);
}
inline Price get_best_ask() const noexcept {
return best_ask_.load(std::memory_order_acquire);
}
inline Price get_mid_price() const noexcept {
return (get_best_bid() + get_best_ask()) / 2;
}
inline Price get_spread() const noexcept {
return get_best_ask() - get_best_bid();
}
inline uint64_t get_last_update_ns() const noexcept {
return last_update_ns_.load(std::memory_order_acquire);
}
bool get_latest_tick(MarketTick& tick) {
return tick_buffer_.try_pop(tick);
}
private:
void process_market_data() {
// Set thread affinity and priority for optimal performance
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(2, &cpuset); // Bind to CPU core 2
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
char buffer[4096];
MarketTick tick;
while (running_.load(std::memory_order_acquire)) {
ssize_t bytes_received = market_data_socket_.receive_data(buffer,
sizeof(buffer));
if (bytes_received > 0) {
// Parse market data (simplified binary format)
if (parse_market_data(buffer, bytes_received, tick)) {
// Update atomic variables for ultra-fast access
best_bid_.store(tick.bid_price, std::memory_order_release);
best_ask_.store(tick.ask_price, std::memory_order_release);
bid_size_.store(tick.bid_size, std::memory_order_release);
ask_size_.store(tick.ask_size, std::memory_order_release);
auto now_ns = std::chrono::duration_cast<Nanoseconds>(
HighResClock::now().time_since_epoch()).count();
last_update_ns_.store(now_ns, std::memory_order_release);
// Add to ring buffer for strategy processing
tick_buffer_.try_push(tick);
}
}
// Minimal sleep to prevent CPU spinning
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
}
bool parse_market_data(const char* buffer, size_t size, MarketTick& tick) {
// Simplified binary parser - in production, use exchange-specific protocol
if (size < sizeof(MarketTick)) return false;
// Example: Parse CME MDP 3.0 format (simplified)
struct MDPMessage {
uint32_t sequence_number;
uint64_t timestamp;
Price bid_price;
Price ask_price;
Quantity bid_size;
Quantity ask_size;
} __attribute__((packed));
if (size >= sizeof(MDPMessage)) {
const MDPMessage* msg = reinterpret_cast<const MDPMessage*>(buffer);
tick.sequence_number = msg->sequence_number;
tick.timestamp = HighResClock::now();
tick.bid_price = msg->bid_price;
tick.ask_price = msg->ask_price;
tick.bid_size = msg->bid_size;
tick.ask_size = msg->ask_size;
return true;
}
return false;
}
};
// =============================================================================
// ULTRA-LOW LATENCY ORDER EXECUTION ENGINE
// =============================================================================
class OrderExecutionEngine {
private:
UltraLowLatencySocket order_socket_;
LockFreeRingBuffer<OrderRequest, MAX_ORDERS> pending_orders_;
LockFreeRingBuffer<ExecutionReport, MAX_ORDERS> execution_reports_;
std::atomic<bool> running_{false};
std::thread execution_thread_;
// Performance counters
alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> orders_sent_{0};
alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> orders_filled_{0};
alignas(CACHE_LINE_SIZE) std::atomic<uint64_t> total_latency_ns_{0};
public:
OrderExecutionEngine() = default;
~OrderExecutionEngine() {
stop();
}
bool start_execution_engine(const char* host, int port) {
if (!order_socket_.initialize_tcp_socket(host, port)) {
return false;
}
running_.store(true, std::memory_order_release);
execution_thread_ = std::thread(&OrderExecutionEngine::process_orders,
this);
return true;
}
void stop() {
running_.store(false, std::memory_order_release);
if (execution_thread_.joinable()) {
execution_thread_.join();
}
}
// Ultra-fast order submission
bool submit_order(OrderRequest::Type type, Price price, Quantity quantity) {
OrderRequest order(type, price, quantity);
// Record submission time for latency measurement
order.timestamp = HighResClock::now();
bool success = pending_orders_.try_push(order);
if (success) {
orders_sent_.fetch_add(1, std::memory_order_relaxed);
}
return success;
}
bool get_execution_report(ExecutionReport& report) {
return execution_reports_.try_pop(report);
}
// Performance metrics
uint64_t get_orders_sent() const {
return orders_sent_.load(std::memory_order_acquire);
}
uint64_t get_orders_filled() const {
return orders_filled_.load(std::memory_order_acquire);
}
double get_average_latency_us() const {
uint64_t total_ns = total_latency_ns_.load(std::memory_order_acquire);
uint64_t filled = get_orders_filled();
return filled > 0 ? (total_ns / filled) / 1000.0 : 0.0;
}
private:
void process_orders() {
// Set thread affinity and priority
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(3, &cpuset); // Bind to CPU core 3
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
OrderRequest order;
char send_buffer[1024];
char recv_buffer[4096];
while (running_.load(std::memory_order_acquire)) {
// Process outgoing orders
while (pending_orders_.try_pop(order)) {
if (send_order_to_exchange(order, send_buffer,
sizeof(send_buffer))) {
// Order sent successfully - now wait for execution report
}
}
// Process incoming execution reports
ssize_t bytes_received = order_socket_.receive_data(recv_buffer,
sizeof(recv_buffer));
if (bytes_received > 0) {
ExecutionReport report;
if (parse_execution_report(recv_buffer, bytes_received, report)) {
// Calculate latency
auto now = HighResClock::now();
auto latency_ns = std::chrono::duration_cast<Nanoseconds>(
now.time_since_epoch()).count() -
std::chrono::duration_cast<Nanoseconds>(
report.timestamp.time_since_epoch()).count();
total_latency_ns_.fetch_add(latency_ns,
std::memory_order_relaxed);
orders_filled_.fetch_add(1, std::memory_order_relaxed);
execution_reports_.try_push(report);
}
}
// Minimal sleep
std::this_thread::sleep_for(std::chrono::nanoseconds(50));
}
}
bool send_order_to_exchange(const OrderRequest& order, char* buffer, size_t
buffer_size) {
// Simplified FIX protocol message construction
struct FixMessage {
char begin_string[16] = "FIX.4.4";
char msg_type = (order.type == OrderRequest::Type::BUY) ? 'D' :
'D'; // New Order Single
uint64_t order_id;
char side = (order.type == OrderRequest::Type::BUY) ? '1' : '2';
Price price;
Quantity quantity;
char time_in_force = 'I'; // IOC
char ord_type = '2'; // Limit order
} __attribute__((packed));
FixMessage* msg = reinterpret_cast<FixMessage*>(buffer);
msg->order_id = order.order_id;
msg->price = order.price;
msg->quantity = order.quantity;
ssize_t bytes_sent = order_socket_.send_data(buffer, sizeof(FixMessage));
return bytes_sent > 0;
}
bool parse_execution_report(const char* buffer, size_t size, ExecutionReport&
report) {
// Simplified execution report parser
if (size < 32) return false;
// Parse execution report from exchange (simplified)
struct ExecReportMsg {
uint64_t order_id;
char exec_type; // '0' = New, '1' = Partial Fill, '2' = Fill
Price fill_price;
Quantity fill_quantity;
Quantity remaining_quantity;
} __attribute__((packed));
if (size >= sizeof(ExecReportMsg)) {
const ExecReportMsg* msg = reinterpret_cast<const
ExecReportMsg*>(buffer);
report.order_id = msg->order_id;
report.fill_price = msg->fill_price;
report.fill_quantity = msg->fill_quantity;
report.remaining_quantity = msg->remaining_quantity;
report.timestamp = HighResClock::now();
// Map execution type
switch (msg->exec_type) {
case '0': report.status = ExecutionReport::Status::NEW; break;
case '1': report.status =
ExecutionReport::Status::PARTIALLY_FILLED; break;
case '2': report.status = ExecutionReport::Status::FILLED; break;
default: report.status = ExecutionReport::Status::REJECTED; break;
}
return true;
}
return false;
}
};
// =============================================================================
// STRATEGY INTERFACE FOR PYTHON INTEGRATION
// =============================================================================
class StrategySignal {
public:
enum class Action : uint8_t { HOLD, BUY, SELL };
Action action = Action::HOLD;
Price target_price = 0;
Quantity size = 0;
Price stop_loss = 0;
Price take_profit = 0;
double confidence = 0.0;
TimePoint timestamp;
StrategySignal() : timestamp(HighResClock::now()) {}
bool is_valid() const {
return action != Action::HOLD && size > 0 && confidence > 0.3;
}
};
// =============================================================================
// MAIN EXECUTION CONTROLLER
// =============================================================================
class UltraLowLatencyExecutionController {
private:
MarketDataFeedHandler market_data_handler_;
OrderExecutionEngine execution_engine_;
// Current position tracking
std::atomic<Quantity> current_position_{0};
std::atomic<Price> position_entry_price_{0};
std::atomic<TimePoint> position_entry_time_;
// Performance tracking
std::atomic<int64_t> realized_pnl_{0}; // In ticks
std::atomic<uint64_t> total_trades_{0};
bool running_{false};
std::thread strategy_thread_;
public:
UltraLowLatencyExecutionController() = default;
~UltraLowLatencyExecutionController() {
stop();
}
bool initialize(const char* md_multicast_group, int md_port,
const char* order_host, int order_port) {
// Start market data feed
if (!market_data_handler_.start_feed(md_multicast_group, md_port)) {
return false;
}
// Start order execution engine
if (!execution_engine_.start_execution_engine(order_host, order_port)) {
market_data_handler_.stop();
return false;
}
// Start strategy processing thread
running_ = true;
strategy_thread_ =
std::thread(&UltraLowLatencyExecutionController::strategy_loop, this);
return true;
}
void stop() {
running_ = false;
if (strategy_thread_.joinable()) {
strategy_thread_.join();
}
execution_engine_.stop();
market_data_handler_.stop();
}
// Interface for Python strategy signals
bool process_strategy_signal(const StrategySignal& signal) {
if (!signal.is_valid()) {
return false;
}
// Get current market state
Price current_bid = market_data_handler_.get_best_bid();
Price current_ask = market_data_handler_.get_best_ask();
Price spread = market_data_handler_.get_spread();
// Risk checks
if (spread > 2 * ES_TICK_SIZE) { // Don't trade if spread > 0.5 points
return false;
}
Quantity current_pos = current_position_.load(std::memory_order_acquire);
if (signal.action == StrategySignal::Action::BUY && current_pos <= 0) {
return execution_engine_.submit_order(
OrderRequest::Type::BUY,
current_ask, // Market order at ask
signal.size
);
} else if (signal.action == StrategySignal::Action::SELL && current_pos >=
0) {
return execution_engine_.submit_order(
OrderRequest::Type::SELL,
current_bid, // Market order at bid
signal.size
);
}
return false;
}
// Performance metrics
struct PerformanceStats {
uint64_t total_trades;
int64_t realized_pnl_ticks;
double realized_pnl_dollars;
double average_latency_us;
Quantity current_position;
uint64_t orders_sent;
uint64_t orders_filled;
};
PerformanceStats get_performance_stats() const {
PerformanceStats stats;
stats.total_trades = total_trades_.load(std::memory_order_acquire);
stats.realized_pnl_ticks = realized_pnl_.load(std::memory_order_acquire);
stats.realized_pnl_dollars = (stats.realized_pnl_ticks * ES_POINT_VALUE) /
100.0;
stats.average_latency_us = execution_engine_.get_average_latency_us();
stats.current_position = current_position_.load(std::memory_order_acquire);
stats.orders_sent = execution_engine_.get_orders_sent();
stats.orders_filled = execution_engine_.get_orders_filled();
return stats;
}
private:
void strategy_loop() {
// Set thread affinity
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(1, &cpuset); // Bind to CPU core 1
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
ExecutionReport report;
MarketTick tick;
while (running_) {
// Process execution reports
while (execution_engine_.get_execution_report(report)) {
handle_execution_report(report);
}
// Process market data
while (market_data_handler_.get_latest_tick(tick)) {
// Market data is available for strategy decisions
// In production, this would trigger Python strategy callbacks
}
// Minimal sleep
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
}
void handle_execution_report(const ExecutionReport& report) {
if (report.status == ExecutionReport::Status::FILLED ||
report.status == ExecutionReport::Status::PARTIALLY_FILLED) {
Quantity current_pos =
current_position_.load(std::memory_order_acquire);
Price entry_price =
position_entry_price_.load(std::memory_order_acquire);
// Update position
if (current_pos == 0) {
// Opening new position
current_position_.store(report.fill_quantity,
std::memory_order_release);
position_entry_price_.store(report.fill_price,
std::memory_order_release);
position_entry_time_.store(HighResClock::now(),
std::memory_order_release);
} else if ((current_pos > 0 && report.fill_quantity < 0) ||
(current_pos < 0 && report.fill_quantity > 0)) {
// Closing position - calculate P&L
int64_t pnl_ticks = (report.fill_price - entry_price) *
std::min(abs(current_pos),
abs(report.fill_quantity));
if (current_pos < 0) {
pnl_ticks = -pnl_ticks; // Short position
}
realized_pnl_.fetch_add(pnl_ticks, std::memory_order_relaxed);
total_trades_.fetch_add(1, std::memory_order_relaxed);
// Update position
current_position_.store(current_pos + report.fill_quantity,
std::memory_order_release);
}
}
}
};
// =============================================================================
// C INTERFACE FOR PYTHON INTEGRATION
// =============================================================================
extern "C" {
// C interface for Python ctypes integration