In a previous post I introduced event activities in Windows Workflow Foundation and what were the basics of how to create one. Now that we know which interfaces we need to implement, let's see now how to actually implement one.
In a regular activity, the basic lifetime is fairly simple: you would basically override the Execute() method, do your work and return ActivityExecutionStatus.Closed. If you needed to do your work asynchronously, you'd return ActivityExecutionStatus.Executing, and then eventually report your status as Closed. For asynchronous activities, you'd probably want to override Cancel(), as well. Of course, let us not forget Initialize() which will be the first thing to execute.
The same sequence of events applies to an Event Activity. Almost. This is because there are other methods that are called interleaved with the regular Activity methods which are important. Furthermore, you ideally want your activity to be able to execute in a regular scenario as well as inside an EventDrivenActivity, while reusing as much of your code as possible between the two possible execution paths.
The basic sequence of things for an Event Activity inside an EventDrivenActivity is a little bit different. Here's a quick review of the workflow that occurs:
- Initialize() is called on your activity, as usual
- IEventActivity.Subscribe() is called on your activity by your parent EventDrivenActivity
- Your parent EventDrivenActivity is notified of your event through its IEventActivityListener
.OnEvent implementation.
- IEventActivity.Unsubscribe() is called on your activity by your parent EventDrivenActivity
- Execute() is called
Let us look now then at what you would do in each of these steps to ensure your Event Activity works and that you can reuse most of the code in both scenarios:
1 When Initialize() is called, do anything you need to initialize your activity. This is also a good place to generate a unique name for the WorkflowQueue you'll use later to communicate with your runtime service (a GUID is a good choice).
2 Your parent EventDrivenActivity subscribes to events on your activity's workflow queue by calling IEventActivity.Subscribe() on your instance. This means that it will be your parent activity which will get notified when something happens on the WorkflowQueue instance that you created to communicate with your runtime service, and not your own activity. This is the reason why IEventActivityListener
Normally, here all you would do is ask your runtime service to do an operation from you and to put the result into a workflow queue you created for this piurpose. You'd then subscribe your parent activity in your Subscribe() implementation to the QueueItemAvailable event of your workflow queue, by using WorkflowQueuingService.RegisterForQueueItemAvailable(). Here's some example code out of my MsmqReceiveActivity:
void IEventActivity.Subscribe(ActivityExecutionContext parentContext,
IActivityEventListener<QueueEventArgs> parentEventHandler)
{
if ( parentContext == null )
throw new ArgumentNullException("parentContext");
if ( parentEventHandler == null )
throw new ArgumentNullException("parentEventHandler");
TraceUtil.WriteInfo("MsmqReceiveActivity::Subscribe({0})",
parentContext.Activity.Name);
WorkflowQueue queue = GetWorkflowQueue(parentContext);
queue.RegisterForQueueItemAvailable(parentEventHandler, QualifiedName);
MsmqListenerService msmqSvc =
parentContext.GetService<MsmqListenerService>();
_subscriptionID = msmqSvc.Subscribe(_wfQueueName, Queue);
}
Notice how we pass our parent’s event handler to RegisterQueueItemAvailable() instead of our own.
3 Now, things are completely left to your runtime service, which must wait for the external event to happen. Once the external event occurs, it must enqueue a notification into the WorkflowQueue instance you provided to it. This usually means that at the very least your runtime service needs to know which events are of interest to which workflow instances, and possibly do much more (for example, if you implement correlation, this is where you’ll need to figure out which correlations are met by an incoming event).
In the case of my MsmqReceiveActivity, the runtime service is fairly simple: It basically keeps a dictionary of workflow instances (activities) that are interested in messages from a given MSMQ queue. When a message arrives at said MSMQ queue, it extracts the data from the message and then writes it into the WorkflowQueue provided when the activity “subscribed” to the queue.
4 When the actions on the WorkflowQueue you requested notification for happen (i.e. when your runtime service has posted an actual event to the queue), your parent EventDrivenActivity will get notified of this; however, it will not actually remove the event data from the WorkflowQueue.
Now, your parent activity will unsubscribe for your activity's workflow queue events by calling IEventActivity.Unsubscribe() on your activity, since it already has received the notification it was waiting for. Here you would typically just ask your runtime service to stop notifying you of events, and then unregister from the WorkflowQueue events by calling WorkflowQueuingService.UnregisterForQueueItemAvailable() (or whatever action suits your).
However, it is very important that here you do not delete the workflow queue; you'll need it in the next step. Here’s some sample code:
void IEventActivity.Unsubscribe(ActivityExecutionContext parentContext,
IActivityEventListener<QueueEventArgs> parentEventHandler)
{
if ( parentContext == null )
throw new ArgumentNullException("parentContext");
if ( parentEventHandler == null )
throw new ArgumentNullException("parentEventHandler");
TraceUtil.WriteInfo("MsmqReceiveActivity::Unsubscribe({0})",
parentContext.Activity.Name);
//
// Tell our runtime Service that we are not interested
// in messages arriving at the MSMQ Queue anymore.
//
MsmqListenerService msmqSvc = (MsmqListenerService)
parentContext.GetService(typeof(MsmqListenerService));
msmqSvc.Unsubscribe(_subscriptionID);
_subscriptionID = Guid.Empty;
WorkflowQueue queue = GetWorkflowQueue(parentContext);
queue.UnregisterForQueueItemAvailable(parentEventHandler);
}
5 Now your parent EventDrivenActivity will actually proceed to request the WorkflowRuntime to execute your activity, which will result in your Execute() method getting called. What you'll do here is just get the event data out of your WorkflowQueue (this is why you can't delete it in the previous step!) and process it. Once you're done, you'll want to delete your workflow queue, and let the runtime now you're done executing.
Since we want to reuse our code as much as possible, you might be wondering that this is obviously not going to work when you are not in an EventDrivenActivity, and you’d be right. This is why you want to do a few tests first. To ensure the code works in both scenarios, you’ll want to implement your Execute() a little differently. Here’s how:
- Check to see if you’ve already created a WorkflowQueue. If so, check to see if there are any pending notifications on it (by simply checking how many items the queue has). If it has one, great, you were inside an EventDrivenActivity and your event was actually called. Just get the event data out and process it.
- If there are no pending notifications in the queue, then subscribe to yourself, and tell the workflow runtime that you’re not finished executing. This, in essence, means calling your own IEventActivity.Subscribe() implementation, passing your implementation of IActivityEventListener
as the second argument. Now, when the actual event fires, your activity will get notified and you can, again, simply process the data from the WorkflowQueue and let the runtime you’re done executing!
protected override ActivityExecutionStatus Execute(ActivityExecutionContext context)
{
if ( context == null )
throw new ArgumentNullException("context");
TraceUtil.WriteInfo("MsmqReceiveActivity::Execute()");
//
// If there are messages pending in the WorkflowQueue
// it means we are inside an EventDrivenActivity, since we
// previously subscribed to the msmq queue as requested by
// our parent activity. We can just process it and exit.
//
// If not, then we reuse our existing mechanism by subcribing
// to messages (thus requesting the MsmqListenerService to
// monitor the queue), and waiting for the notification to
// end. We appear to execute asynchronously to the runtime
// because of this.
//
if ( ProcessMessageFromQueue(context) )
{
return ActivityExecutionStatus.Closed;
}
((IEventActivity)this).Subscribe(context, this);
_activitySubscribed = true;
return ActivityExecutionStatus.Executing;
}
void IActivityEventListener<QueueEventArgs>.OnEvent(object sender, QueueEventArgs e)
{
//
// An event has been triggered at our workflow queue.
// However, this is only triggered if the direct subscriber
// to the event is ourselves (and not our parent activity), which
// only happens for us when running outside of an EventDrivenActivity.
//
TraceUtil.WriteInfo("MsmqReceiveActivity::OnEvent() - {0}", ExecutionStatus);
if ( ExecutionStatus == ActivityExecutionStatus.Executing )
{
ActivityExecutionContext context = (ActivityExecutionContext)sender;
if ( ProcessMessageFromQueue(context) )
{
context.CloseActivity();
}
}
}
private bool ProcessMessageFromQueue(ActivityExecutionContext context)
{
WorkflowQueuingService queueSvc = (WorkflowQueuingService)
context.GetService(typeof(WorkflowQueuingService));
//
// If the workflow queue exists, and it contains an event
// then we extract it and process it. It should contain the
// MessageDataEventArgs sent by the MsmqListenerService
// when it received the message from the MSMQ Queue.
//
if ( queueSvc.Exists(_wfQueueName) )
{
WorkflowQueue queue = queueSvc.GetWorkflowQueue(_wfQueueName);
if ( queue.Count > 0 )
{
MessageDataEventArgs data = (MessageDataEventArgs)queue.Dequeue();
OnMessageReceived(this, data);
if ( _activitySubscribed )
{
((IEventActivity)this).Unsubscribe(context, this);
}
queueSvc.DeleteWorkflowQueue(_wfQueueName);
return true;
}
}
return false;
}
As you can see here, if your activity is not inside an EventDrivenActivity, it should basically execute in an asynchronous fashion. This is logical, if you consider that despite it being executed by the workflow runtime at one point in time, your activity’s purpose is still to wait for an external event
Other things to consider
There are a few other things you’ll want to take into account when implementing Event Activities:
- If your activity’s execution is canceled, then you’ll want to cancel any outstanding subscriptions you have with your runtime service.
- If your Event Activity is located as one branch of a ListenActivity, and the other branch event is fired first, your activity instance will not be canceled. Instead, all that will happen is that the parent EventDrivenActivity will unsubscribe from you. I’m not sure why Cancel() is not called in this scenario (it seems like the logical thing to do, to me); it might be because since your activity’s Execute() has never been called, and thus there should be nothing to Cancel. However, I do thing this might lead to problems because there is no clear finalization point here, which might mean that you might miss an opportunity to do early clean up of allocated resources (like your workflow queue).
Furthermore, it seems to me like a lot of this is basically boilerplate code, so I do believe there is space here to implement helper classes that could simplify the creation of robust Event Activities.