Transformer Communication
Transformers in the Transmission library can communicate with each other through several mechanisms: Effects, Computations, Executions, and Checkpoints. This enables complex business logic flows while maintaining loose coupling between components.
Communication Methods
1. Effects (Asynchronous Communication)
Effects are the primary way for Transformers to communicate asynchronously.
Publishing Effects
class SourceTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<TriggerActionSignal> { signal ->
// Publish effect to all listening transformers
publish(DataProcessingEffect(signal.data))
}
}
}
class TargetTransformer : Transformer() {
override val handlers: Handlers = handlers {
onEffect<DataProcessingEffect> { effect ->
// Handle the effect
val processedData = processData(effect.data)
send(ProcessedData(processedData))
}
}
}
Targeted Effects
class SpecificTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<SendToSpecificSignal> { signal ->
// Send effect to a specific transformer
publish(
effect = SpecificEffect(signal.data),
identity = targetTransformerIdentity
)
}
}
}
2. Computations (Synchronous Queries)
Computations allow one Transformer to query data from another synchronously.
Simple Computations
class DataProviderTransformer : Transformer() {
private var currentData = "Initial State"
override val computations: Computations = computations {
register(getCurrentDataContract) {
currentData
}
register(getDataStatusContract) {
DataStatus(
value = currentData,
lastUpdated = System.currentTimeMillis(),
isValid = currentData.isNotEmpty()
)
}
}
override val handlers: Handlers = handlers {
onSignal<UpdateDataSignal> { signal ->
currentData = signal.newData
send(DataUpdated(currentData))
}
}
companion object {
val getCurrentDataContract = Contract.computation<String>()
val getDataStatusContract = Contract.computation<DataStatus>()
}
}
class DataConsumerTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<ProcessDataSignal> {
// Query data from another transformer
val currentData = compute(DataProviderTransformer.getCurrentDataContract)
val status = compute(DataProviderTransformer.getDataStatusContract)
if (status.isValid) {
val result = processData(currentData)
send(ProcessingResult(result))
} else {
send(ProcessingError("Invalid data state"))
}
}
}
}
Computations with Arguments
class CalculationTransformer : Transformer() {
override val computations: Computations = computations {
register(calculateSumContract) { numbers: List<Int> ->
numbers.sum()
}
register(formatCurrencyContract) { amount: Double ->
"%.2f USD".format(amount)
}
register(validateDataContract) { data: InputData ->
ValidationResult(
isValid = data.isNotEmpty() && data.isNumeric(),
errors = validateInput(data)
)
}
}
companion object {
val calculateSumContract = Contract.computationWithArgs<List<Int>, Int>()
val formatCurrencyContract = Contract.computationWithArgs<Double, String>()
val validateDataContract = Contract.computationWithArgs<InputData, ValidationResult>()
}
}
class BusinessLogicTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<ProcessOrderSignal> { signal ->
// Validate input data
val validation = compute(CalculationTransformer.validateDataContract, signal.orderData)
if (!validation.isValid) {
send(OrderValidationFailed(validation.errors))
return@onSignal
}
// Calculate totals
val itemPrices = signal.orderData.items.map { it.price }
val total = compute(CalculationTransformer.calculateSumContract, itemPrices)
// Format for display
val formattedTotal = compute(CalculationTransformer.formatCurrencyContract, total.toDouble())
send(OrderProcessed(signal.orderData.id, formattedTotal))
}
}
}
3. Executions (Fire-and-Forget Operations)
Executions are used for side effects that don't return values.
class AuditTransformer : Transformer() {
override val executions: Executions = executions {
register(logUserActionContract) { action: UserAction ->
writeAuditLog(action.userId, action.action, action.timestamp)
}
register(sendNotificationContract) { notification: Notification ->
sendPushNotification(notification.userId, notification.message)
}
register(updateAnalyticsContract) { event: AnalyticsEvent ->
analyticsService.track(event.name, event.properties)
}
}
companion object {
val logUserActionContract = Contract.executionWithArgs<UserAction>()
val sendNotificationContract = Contract.executionWithArgs<Notification>()
val updateAnalyticsContract = Contract.executionWithArgs<AnalyticsEvent>()
}
}
class UserInteractionTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<UserClickedButtonSignal> { signal ->
// Fire-and-forget operations
execute(AuditTransformer.logUserActionContract, UserAction(
userId = signal.userId,
action = "button_click:${signal.buttonId}",
timestamp = System.currentTimeMillis()
))
execute(AuditTransformer.updateAnalyticsContract, AnalyticsEvent(
name = "button_click",
properties = mapOf("button_id" to signal.buttonId)
))
// Continue with main business logic
send(ButtonClickProcessed(signal.buttonId))
}
}
}
Complex Communication Patterns
Chain of Responsibility
class ValidationChainTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<ValidateDataSignal> { signal ->
// Start validation chain
publish(ValidateFormatEffect(signal.data))
}
onEffect<ValidateFormatEffect> { effect ->
if (isValidFormat(effect.data)) {
publish(ValidateBusinessRulesEffect(effect.data))
} else {
send(ValidationFailed("Invalid format"))
}
}
onEffect<ValidateBusinessRulesEffect> { effect ->
if (passesBusinessRules(effect.data)) {
publish(ValidateSecurityEffect(effect.data))
} else {
send(ValidationFailed("Business rules violation"))
}
}
onEffect<ValidateSecurityEffect> { effect ->
if (isSecure(effect.data)) {
send(ValidationPassed(effect.data))
} else {
send(ValidationFailed("Security check failed"))
}
}
}
}
Observer Pattern
class EventBroadcasterTransformer : Transformer() {
private val subscribersHolder = dataHolder(
initialValue = SubscriberState(),
contract = subscriberContract
)
override val handlers: Handlers = handlers {
onSignal<SubscribeToEventsSignal> { signal ->
subscribersHolder.update { state ->
state.copy(subscribers = state.subscribers + signal.subscriberId)
}
}
onSignal<BroadcastEventSignal> { signal ->
val subscribers = subscribersHolder.getValue().subscribers
subscribers.forEach { subscriberId ->
publish(EventBroadcastEffect(subscriberId, signal.event))
}
}
}
companion object {
val subscriberContract = Contract.dataHolder<SubscriberState>()
}
}
class EventListenerTransformer(private val listenerId: String) : Transformer() {
override val handlers: Handlers = handlers {
onEffect<EventBroadcastEffect> { effect ->
if (effect.subscriberId == listenerId) {
// Handle the broadcasted event
handleEvent(effect.event)
}
}
}
private suspend fun CommunicationScope.handleEvent(event: Event) {
when (event.type) {
"user_action" -> send(UserActionReceived(event.data))
"system_update" -> send(SystemUpdateReceived(event.data))
}
}
}
State Machine Communication
class OrderStateMachineTransformer : Transformer() {
private val orderStateHolder = dataHolder(
initialValue = OrderState.Pending,
contract = orderStateContract
)
override val computations: Computations = computations {
register(getCurrentOrderStateContract) {
orderStateHolder.getValue()
}
register(canTransitionToContract) { targetState: OrderState ->
val currentState = orderStateHolder.getValue()
isValidTransition(currentState, targetState)
}
}
override val handlers: Handlers = handlers {
onSignal<ProcessOrderSignal> { signal ->
val currentState = orderStateHolder.getValue()
when (currentState) {
is OrderState.Pending -> {
orderStateHolder.update { OrderState.Processing }
publish(StartOrderProcessingEffect(signal.orderId))
}
is OrderState.Processing -> {
send(OrderAlreadyProcessing(signal.orderId))
}
is OrderState.Completed -> {
send(OrderAlreadyCompleted(signal.orderId))
}
}
}
onEffect<OrderProcessingCompleteEffect> { effect ->
orderStateHolder.update { OrderState.Completed }
publish(OrderCompletedEffect(effect.orderId))
}
}
companion object {
val orderStateContract = Contract.dataHolder<OrderState>()
val getCurrentOrderStateContract = Contract.computation<OrderState>()
val canTransitionToContract = Contract.computationWithArgs<OrderState, Boolean>()
}
}
sealed class OrderState : Transmission.Data {
object Pending : OrderState()
object Processing : OrderState()
object Completed : OrderState()
}
Examples from Samples
Components Sample Communication
@OptIn(ExperimentalTransmissionApi::class)
class InputTransformer : Transformer() {
private val holder = dataHolder(InputUiState(), holderContract)
override val computations: Computations = computations {
register(writtenInputContract) {
delay(1.seconds)
WrittenInput(holder.getValue().writtenText)
}
register(writtenInputWithArgs) {
WrittenInput(it)
}
}
override val handlers: Handlers = handlers {
onSignal<InputSignal.InputUpdate> { signal ->
holder.update { it.copy(writtenText = signal.value) }
// Checkpoint-based communication
val color = pauseOn(colorCheckpoint)
// Send effect to specific transformer
send(
effect = ColorPickerEffect.SelectedColorUpdate(color),
identity = multiOutputTransformerIdentity
)
// Broadcast effect to all listeners
publish(effect = InputEffect.InputUpdate(signal.value))
}
onEffect<ColorPickerEffect.BackgroundColorUpdate> { effect ->
validate(colorCheckpoint, effect.color)
holder.update { it.copy(backgroundColor = effect.color) }
}
}
}
class ColorPickerTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<ColorPickerSignal.ColorSelected> { signal ->
// Respond to color selection
publish(ColorPickerEffect.BackgroundColorUpdate(signal.color))
}
onEffect<ColorPickerEffect.SelectedColorUpdate> { effect ->
// Handle incoming color update from InputTransformer
processSelectedColor(effect.color)
}
}
}
Advanced Communication Patterns
Request-Response Pattern
class ServiceTransformer : Transformer() {
private val pendingRequests = mutableMapOf<String, String>()
override val handlers: Handlers = handlers {
onEffect<ServiceRequestEffect> { effect ->
val requestId = generateRequestId()
pendingRequests[requestId] = effect.requesterId
// Process request
val result = processServiceRequest(effect.request)
// Send response back to requester
publish(ServiceResponseEffect(
requestId = requestId,
response = result,
targetTransformerId = effect.requesterId
))
}
}
}
class ClientTransformer(private val transformerId: String) : Transformer() {
override val handlers: Handlers = handlers {
onSignal<RequestServiceSignal> { signal ->
// Send request to service
publish(ServiceRequestEffect(
request = signal.request,
requesterId = transformerId
))
}
onEffect<ServiceResponseEffect> { effect ->
if (effect.targetTransformerId == transformerId) {
// Handle response
send(ServiceResponseReceived(effect.response))
}
}
}
}
Pipeline Pattern
class PipelineStage1Transformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<StartPipelineSignal> { signal ->
val stage1Result = processStage1(signal.input)
publish(Stage1CompleteEffect(stage1Result))
}
}
}
class PipelineStage2Transformer : Transformer() {
override val handlers: Handlers = handlers {
onEffect<Stage1CompleteEffect> { effect ->
val stage2Result = processStage2(effect.data)
publish(Stage2CompleteEffect(stage2Result))
}
}
}
class PipelineStage3Transformer : Transformer() {
override val handlers: Handlers = handlers {
onEffect<Stage2CompleteEffect> { effect ->
val finalResult = processStage3(effect.data)
send(PipelineCompleted(finalResult))
}
}
}
Mediator Pattern
class MediatorTransformer : Transformer() {
override val handlers: Handlers = handlers {
onEffect<ComponentAEvent> { effect ->
// Mediate between Component A and Component B
val transformedData = transformDataForB(effect.data)
publish(ComponentBCommand(transformedData))
}
onEffect<ComponentBEvent> { effect ->
// Mediate between Component B and Component A
val transformedData = transformDataForA(effect.data)
publish(ComponentACommand(transformedData))
}
onEffect<ComponentACommand> { effect ->
// Forward command to Component A
publish(ProcessComponentAEffect(effect.data))
}
onEffect<ComponentBCommand> { effect ->
// Forward command to Component B
publish(ProcessComponentBEffect(effect.data))
}
}
}
Error Handling in Communication
Safe Computation Calls
class SafeCommunicationTransformer : Transformer() {
override val handlers: Handlers = handlers {
onSignal<SafeQuerySignal> {
try {
val result = compute(DataProviderTransformer.dataContract)
send(QuerySuccessful(result))
} catch (e: Exception) {
send(QueryFailed("Failed to compute data: ${e.message}"))
execute(LoggingTransformer.logErrorContract, e.message)
}
}
}
}
Circuit Breaker Pattern
class CircuitBreakerTransformer : Transformer() {
private val circuitState = dataHolder(
initialValue = CircuitState.Closed,
contract = circuitStateContract
)
override val handlers: Handlers = handlers {
onSignal<CallExternalServiceSignal> { signal ->
val currentState = circuitState.getValue()
when (currentState) {
is CircuitState.Closed -> {
try {
val result = callExternalService(signal.request)
send(ServiceCallSuccessful(result))
} catch (e: Exception) {
circuitState.update { CircuitState.Open(System.currentTimeMillis()) }
send(ServiceCallFailed("Service unavailable"))
}
}
is CircuitState.Open -> {
if (System.currentTimeMillis() - currentState.openedAt > CIRCUIT_TIMEOUT) {
circuitState.update { CircuitState.HalfOpen }
// Retry the call
onSignal(signal)
} else {
send(ServiceCallFailed("Circuit breaker is open"))
}
}
is CircuitState.HalfOpen -> {
try {
val result = callExternalService(signal.request)
circuitState.update { CircuitState.Closed }
send(ServiceCallSuccessful(result))
} catch (e: Exception) {
circuitState.update { CircuitState.Open(System.currentTimeMillis()) }
send(ServiceCallFailed("Service still unavailable"))
}
}
}
}
}
}
Best Practices
1. Use Appropriate Communication Methods
// Use Effects for asynchronous notifications
publish(UserLoggedInEffect(user.id))
// Use Computations for data queries
val userData = compute(UserTransformer.getCurrentUserContract)
// Use Executions for fire-and-forget operations
execute(AuditTransformer.logActionContract, action)
2. Design Clear Interfaces
// Good - clear, specific contracts
object OrderContracts {
val getCurrentOrder = Contract.computation<Order?>()
val calculateOrderTotal = Contract.computationWithArgs<OrderItems, BigDecimal>()
val validateOrder = Contract.computationWithArgs<Order, ValidationResult>()
val saveOrder = Contract.executionWithArgs<Order>()
}
// Avoid - vague, generic contracts
val dataContract = Contract.computation<Any>()
val processContract = Contract.computationWithArgs<Any, Any>()
3. Handle Communication Failures
override val handlers: Handlers = handlers {
onSignal<QueryDataSignal> {
try {
val data = compute(DataProviderTransformer.dataContract)
send(DataQuerySuccessful(data))
} catch (e: Exception) {
send(DataQueryFailed("Communication failed: ${e.message}"))
execute(LoggingTransformer.logErrorContract, "Query failed: ${e.message}")
}
}
}
4. Maintain Loose Coupling
// Good - communicates through contracts
val userData = compute(UserContracts.getCurrentUser)
// Avoid - direct references
val userData = userTransformer.getCurrentUser() // Direct coupling
5. Document Communication Flows
/**
* Order Processing Flow:
* 1. OrderTransformer receives CreateOrderSignal
* 2. Publishes ValidateOrderEffect
* 3. ValidationTransformer validates and publishes ProcessPaymentEffect
* 4. PaymentTransformer processes payment and publishes SaveOrderEffect
* 5. OrderTransformer saves order and sends OrderCreatedData
*/
class OrderTransformer : Transformer() {
// Implementation...
}