Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import com.coinflow.order.dto.CancelOrderResponse;
import com.coinflow.order.matching.MatchingEngine;
import com.coinflow.order.repository.OrderRepository;
import com.coinflow.order.service.lock.MarketOrderLockScope;
import com.coinflow.order.service.lock.MarketOrderLockService;
import com.coinflow.order.service.command.MarketOrderCommandQueue;
import com.coinflow.wallet.domain.LedgerType;
import com.coinflow.wallet.service.WalletOrderOperationService;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,22 +27,22 @@ public class OrderCancelService {
private final WalletOrderOperationService walletOrderOperationService;
private final MatchingEngine matchingEngine;
private final DomainEventRecorder eventRecorder;
private final MarketOrderLockService marketOrderLockService;
private final MarketOrderCommandQueue marketOrderCommandQueue;
private final TransactionTemplate transactionTemplate;

public OrderCancelService(
OrderRepository orderRepository,
WalletOrderOperationService walletOrderOperationService,
MatchingEngine matchingEngine,
DomainEventRecorder eventRecorder,
MarketOrderLockService marketOrderLockService,
MarketOrderCommandQueue marketOrderCommandQueue,
PlatformTransactionManager transactionManager
) {
this.orderRepository = orderRepository;
this.walletOrderOperationService = walletOrderOperationService;
this.matchingEngine = matchingEngine;
this.eventRecorder = eventRecorder;
this.marketOrderLockService = marketOrderLockService;
this.marketOrderCommandQueue = marketOrderCommandQueue;
this.transactionTemplate = new TransactionTemplate(transactionManager);
}

Expand All @@ -52,16 +51,12 @@ public CancelOrderResponse cancelOrder(Long currentUserId, Long orderId) {
.orElseThrow(() -> new ApiException(ErrorCode.ORDER_NOT_FOUND));
if (!order.isCancelable()) throw new ApiException(ErrorCode.ORDER_NOT_CANCELABLE);

MarketOrderLockScope marketLockScope = marketOrderLockService.acquire(
return marketOrderCommandQueue.submit(
order.getMarketId(),
order.getMarketSymbol(),
order.getSide()
order.getSide(),
() -> transactionTemplate.execute(status -> cancelInTransaction(currentUserId, orderId))
);
try {
return transactionTemplate.execute(status -> cancelInTransaction(currentUserId, orderId));
} finally {
marketLockScope.release();
}
}

private CancelOrderResponse cancelInTransaction(Long currentUserId, Long orderId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ public MarketOrderCommandQueue(OrderCreateStageRecorder stageRecorder, MeterRegi
}

public <T> T submit(Market market, OrderSide side, Supplier<T> command) {
return submit(market.getId(), market.getSymbol(), side, command);
}

public <T> T submit(Long marketId, String marketSymbol, OrderSide side, Supplier<T> command) {
MarketWorker worker = workers.computeIfAbsent(
market.getId(),
ignored -> new MarketWorker(market.getSymbol())
marketId,
ignored -> new MarketWorker(marketSymbol)
);
QueuedCommand<T> queuedCommand = new QueuedCommand<>(market.getSymbol(), side, command);
QueuedCommand<T> queuedCommand = new QueuedCommand<>(marketSymbol, side, command);
worker.submit(queuedCommand);
return queuedCommand.await();
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.coinflow.order.matching.MatchingEngine;
import com.coinflow.order.matching.OrderBookRecoveryService;
import com.coinflow.order.repository.OrderRepository;
import com.coinflow.order.service.lock.MarketOrderLockScope;
import com.coinflow.order.service.lock.MarketOrderLockService;
import com.coinflow.order.service.metrics.OrderCreateStageRecorder;
import com.coinflow.order.service.settlement.OrderSettlementService;
import com.coinflow.trade.domain.Trade;
Expand All @@ -28,7 +26,6 @@
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Service
Expand All @@ -39,7 +36,6 @@ public class AcceptedOrderProcessor {
private final MatchingEngine matchingEngine;
private final OrderBookRecoveryService orderBookRecoveryService;
private final DomainEventRecorder eventRecorder;
private final MarketOrderLockService marketOrderLockService;
private final OrderSettlementService orderSettlementService;
private final OrderCreateStageRecorder stageRecorder;
private final TransactionTemplate transactionTemplate;
Expand All @@ -50,7 +46,6 @@ public AcceptedOrderProcessor(
MatchingEngine matchingEngine,
OrderBookRecoveryService orderBookRecoveryService,
DomainEventRecorder eventRecorder,
MarketOrderLockService marketOrderLockService,
OrderSettlementService orderSettlementService,
OrderCreateStageRecorder stageRecorder,
PlatformTransactionManager transactionManager
Expand All @@ -60,7 +55,6 @@ public AcceptedOrderProcessor(
this.matchingEngine = matchingEngine;
this.orderBookRecoveryService = orderBookRecoveryService;
this.eventRecorder = eventRecorder;
this.marketOrderLockService = marketOrderLockService;
this.orderSettlementService = orderSettlementService;
this.stageRecorder = stageRecorder;
this.transactionTemplate = new TransactionTemplate(transactionManager);
Expand All @@ -76,59 +70,43 @@ public void processAcceptedOrder(Market market, OrderSide side, Long orderId) {
}

private void processAcceptedOrderInternal(Market market, OrderSide side, Long orderId) {
MarketOrderLockScope marketLockScope = marketOrderLockService.acquire(market, side);
AtomicBoolean releaseRegistered = new AtomicBoolean(false);

try {
transactionTemplate.execute(status -> {
Order order = orderRepository.findByIdWithLock(orderId)
.orElseThrow(() -> new ApiException(ErrorCode.ORDER_NOT_FOUND));
if (order.getStatus() != OrderStatus.ACCEPTED) {
return null;
}

order.open();
List<MatchResult> plan = stageRecorder.record(
market.getSymbol(), side, "matching_plan",
() -> matchingEngine.planMatchRejectingSelfTrade(market, order)
);
List<Order> autoCanceledMakers = new ArrayList<>();
List<Trade> trades = stageRecorder.record(
market.getSymbol(), side, "settlement",
() -> orderSettlementService.settle(market, order, plan, autoCanceledMakers, null, false)
);
transactionTemplate.execute(status -> {
Order order = orderRepository.findByIdWithLock(orderId)
.orElseThrow(() -> new ApiException(ErrorCode.ORDER_NOT_FOUND));
if (order.getStatus() != OrderStatus.ACCEPTED) {
return null;
}

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
long orderBookApplyStartedAt = System.nanoTime();
try {
matchingEngine.applyMatchPlan(market, order, plan);
autoCanceledMakers.forEach(canceledMaker ->
matchingEngine.cancelOrder(market.getSymbol(), canceledMaker));
} catch (Exception e) {
log.error("오더북 applyMatchPlan 실패: orderId={}, DB 체결 내역 기반 재빌드 시도", order.getId(), e);
orderBookRecoveryService.rebuildAfterApplyFailure(market.getId());
} finally {
stageRecorder.record(market.getSymbol(), side, "orderbook_after_commit",
System.nanoTime() - orderBookApplyStartedAt);
marketLockScope.release();
}
}
order.open();
List<MatchResult> plan = stageRecorder.record(
market.getSymbol(), side, "matching_plan",
() -> matchingEngine.planMatchRejectingSelfTrade(market, order)
);
List<Order> autoCanceledMakers = new ArrayList<>();
List<Trade> trades = stageRecorder.record(
market.getSymbol(), side, "settlement",
() -> orderSettlementService.settle(market, order, plan, autoCanceledMakers, null, false)
);

@Override
public void afterCompletion(int status) {
marketLockScope.release();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
long orderBookApplyStartedAt = System.nanoTime();
try {
matchingEngine.applyMatchPlan(market, order, plan);
autoCanceledMakers.forEach(canceledMaker ->
matchingEngine.cancelOrder(market.getSymbol(), canceledMaker));
} catch (Exception e) {
log.error("오더북 applyMatchPlan 실패: orderId={}, DB 체결 내역 기반 재빌드 시도", order.getId(), e);
orderBookRecoveryService.rebuildAfterApplyFailure(market.getId());
} finally {
stageRecorder.record(market.getSymbol(), side, "orderbook_after_commit",
System.nanoTime() - orderBookApplyStartedAt);
}
});
releaseRegistered.set(true);
return trades;
}
});
} finally {
if (!releaseRegistered.get()) {
marketLockScope.release();
}
}
return trades;
});
}

private void rejectAcceptedOrder(Long orderId, String reason) {
Expand Down
Loading
Loading