15.5.13

Service Activator in Spring Integration

Travel of Software Developer: Understanding Service Activator

Just want to continue this topic and focus on Spring Integration's implementation.

How Spring Integration Describes its Service Activator?


  • A Service Activator is a generic endpoint for connecting a service instance to the messaging system.
That's great!
  • A generic endpoint.
  • Connecting a service instance.
  • To the messaging system.

A Generic Endpoint

As I mentioned in the previous post, Service Activator is usually abused for its generality. Does this confirm this problem here? I am not sure.
  • The input Message Channel must be configured, and if the service method to be invoked is capable of returning a value, an output Message Channel may also be provided.
    • The output channel is optional, since each Message may also provide its own "Return Address" header. This same rule applies for all consumer endpoints.
This matches the Pattern characteristics in EIP:
  • A Service Activator can be one-way (request only) or two-way (Request-Reply).
  • The service can be as simple as a method call - synchronous and non-remote - perhaps part of a Service Layer.

More Information about Service Activator in Spring Integration

Implementation of Service Activator in Spring Integration


This is the Factory Bean used to create the Service Activator bean. The implementation of Service Activator's Factory Bean is very much different from the way GatewayProxyFactoryBean, but I don't know why.
GatewayProxyFactoryBean comes all the way from IntegrationObjectSupport, AbstractEndpoint, and AbstractPollingEndpoint, which makes sense for a Messaging Endpoint. While Service Activator is also a Messaging Endpoint, it derives from AbstractSimpleMessageHandlerFactoryBean<H>, then AbstractStandardMessageHandlerFactoryBean. It also makes from the Factory Bean's point of view. And the real handler for Service Activator is: ServiceActivatingHandler:

Although Service Activator can also use SpEL:
  • Since Spring Integration 2.0, Service Activators can also benefit from SpEL.
I am going to focus only on ServiceActivatingHandler now.

ServiceActivatingHandler

@Override
protected Object handleRequestMessage(Message<?> message) {
try {
return this.processor.processMessage(message);
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new MessageHandlingException(message, "failure occurred in Service Activator '" + this + "'", e);
}
}

This is how ServiceActivatingHandler is implemented. But wait, I don't see Asynchronous!! And MessageProcessor<T> sounds new to me now. Let's take a look at how this MessageProcessor is constructed.

MessageProcessor<T> and MethodInvokingMessageProcessor<T>

@Override
MessageHandler createMethodInvokingHandler(Object targetObject, String targetMethodName) {
ServiceActivatingHandler handler = (StringUtils.hasText(targetMethodName))
? new ServiceActivatingHandler(targetObject, targetMethodName)
: new ServiceActivatingHandler(targetObject);
return this.configureHandler(handler);
}

So, this is as simple as a constructor.

public ServiceActivatingHandler(final Object object) {
this(new MethodInvokingMessageProcessor<Object>(object, ServiceActivator.class));
}

And the essential part is MethodInvokingMessageProcessor<Object>, while in Gateway, this is MethodInvocationGateway extends MessagingGatewaySupport.

private final MessagingMethodInvokerHelper<T> delegate;

public T processMessage(Message<?> message) {
try {
return delegate.process(message);
}
catch (Exception e) {
throw new MessageHandlingException(message, e);
}
}

MessagingMethodInvokerHelper<T>, The Delegate

I don't want to look into this class right now, because I believe it just try to find the method and call the right method, the valid basic method reflection. If I find I am wrong later, I will come back.

@Async and Annotation Driven Task Executor

  • To enable both @Scheduled and @Async annotations, simply include the 'annotation-driven' element from the task namespace in your configuration.
This is the way to support Asynchronous Service Activator with Direct Channel.

Implementation of Service Activator with @Async 


Message Return from ServiceActivator with @Async

In the above example, I used a one-way ServiceActivator. This is a very natural way for using Service Activator. You can inject the successive channel into ServiceActivator, so that ServiceActivator could send out messages or receive messages flexibly. I think this is the so-called Half-sync/Half-Async way, not sure though.

However, sometimes we might want to get some output from ServiceActivator. In another word, we want a two-way ServiceActivator. You won't have problem with Synchronous Service Activator, but you will with @Async. If you do it in a normal way, such as:

@Async
@ServiceActivator
public Message<Object> handleMessage(Message<String> msg)

You won't get anything as its output. Why?

AnnotationAsyncExecutionInterceptor.invoke(MethodInvocation)

public Object invoke(final MethodInvocation invocation) throws Throwable {
Future<?> result = this.determineAsyncExecutor(invocation.getMethod()).submit(
new Callable<Object>() {
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (Throwable ex) {
ReflectionUtils.rethrowException(ex);
}
return null;
}
});
if (Future.class.isAssignableFrom(invocation.getMethod().getReturnType())) {
return result;
}
else {
return null;
}
}

This is the place @Async is handled. And you see, only if the methods return type is an instance of Future<T>, the result will be kept. Otherwise, the result will simply be discarded.

@Async
@ServiceActivator
public Future<Message<Object>> handleMessage(Message<String> msg) throws Exception {
return new AsyncResult<Message<Object>>(XXXX);
}

For a complete example, please see this.

But how can Service Activator pass down the return to successive handlers? I don't think there is an easy solution. And that's maybe why you won't see any document from Spring Integration telling you to use @Async for Service Activator.

Service Activator with QueueChannel




No comments: