|  |
 |
Table of contents:
|  | HTML |  | PDF |
This article:
|  |
HTML
|  | PDF | DOI: 10.1147/sj.472.0237 | Copyright info |  |
 |
 |
Toward scalable real-time messaging
|  |  |
by D. Bauer, L. Garcés-Erice, S. Rooney, and P. Scotton
|
 |
 |
Conventional messaging technologies have been designed for large transactional systems, making the prediction and calibration of their delay impractical. In this paper, we present a minimal messaging system, implemented in Java™, that is designed to enable the analysis, modeling, and calibration of the expected performance of these technologies. We describe the algorithms and protocols that underlie this messaging system, show how an analysis can be performed, and give the actual measured performance figures. We show that the system achieves a throughput of more than 100,000 messages per second with less than 120-millisecond maximum latency, in the test environment. At 10,000 messages per second, a maximum latency of 5 milliseconds is measured. The algorithms make use of lock-free data structures, which allow the throughput to scale on multi-core systems.
|  |
 |
|  |
 |  |  |
|
| |
|
As the physical world becomes increasingly connected to computer networks through intelligent sensors and actuators, the number of real-time applications that can take advantage of sensor information to make decisions and actuate physical systems grows. Consequently, physical systems can now be monitored and changed by applications residing in the enterprise computing environment at extremely brief time intervals.
Messaging systems are very attractive in such an environment, as they promote a data-centric form of communication in which the interests of entities determine the associations between them. This allows an entity at the application level to be oblivious to which other entities it is communicating with, permitting a much looser coupling than in an address-centric approach. Within a publish/subscribe messaging system, areas of interest are divided into named topics. Publishers publish messages on topics and subscribers subscribe to them. Messaging systems are commonly used in the enterprise computing environment; their extension to entities with stringent timing requirements provides a homogeneous end-to-end communication infrastructure. Real-time messaging systems present a class of problems for which the timeliness of a solution is part of the definition of its correctness.
Solutions to real-time problems are either implemented through over-provisioning or using a real-time system. Over-provisioning involves purchasing network and computer resources that are seldom or never used. Cost determines the choice between these two approaches. Designing and building a real-time system is costly because all possible executions paths through all layers of the software architecture must be controlled and budgeted with respect to execution time. An over-provisioned system does not guarantee that a deadline will never be missed, only that it was not missed during the testing phase; in some cases, this is adequate. Which of the two approaches is chosen is a trade-off between the cost of building the system and the cost of missing deadlines.
We have developed a messaging system called Tempo that can be used on either an over-provisioned or real-time architecture. Tempo is written in Java**. The basic principle underlying Tempo is to treat the messaging system as a set of independent queuing systems, one for each topic. A messaging scheduler partitions resources between these queuing systems such that each one gets a specified share.
Each topic has its own queue and effectively its own queuing server. The performance of the topic, in particular its delay, can then be addressed using simple queuing models. This alone is not sufficient to make the system real-time, because Tempo itself is executed by the thread scheduler of the operating system (OS). The ability to allocate resources to topics within the messaging scheduler is necessary but not sufficient for performing real-time messaging.
In order to use Tempo to build a real-time system, the entire messaging system is considered a periodic task within the real-time scheduling discipline of the underlying OS (e.g., the rate-monotonic1 discipline). To allow a reasonable worse-case-execution-time (WCET) analysis to be performed, the execution time of all activities in the programming environment needs to be known. This requires a real-time OS in which, for example, the time taken to handle cache misses and demand paging can be bounded. For languages like Java, with complex memory models, the memory bookkeeping tasks also need to be bounded. This can be achieved by either using extensions to the existing memory model (e.g., in the Real-time Specification for Java [RTSJ]2) or with a real-time “garbage collector,” such as Metronome.3 For complex systems a WCET analysis is difficult to perform, as all possible paths must be considered.
Such an analysis may be too conservative, because the worse case of all sub-activities must be summed together to give the WCET, even though the probability of all worst cases occurring simultaneously may be so low that it would not be expected to occur during the lifetime of the hardware. In such cases, a probabilistic WCET might be more appropriate.4
Tempo is structured internally as an event system. Messages can either be outgoing from local applications, in which case the handler function transmits them over the network, or incoming from the network, in which case the handler executes the associated application-specific function. Other events, such as the firing of timers, are also of use to applications within the messaging system, enabling, for example, the periodic transmission of messages without the need for application threads.
The remainder of this paper is organized as follows: In the following section we give a brief overview of the related work. We then summarize the Tempo protocol and its modes of operation. This is followed by a description of the scheduling system. Performance evaluation results are then presented, followed by our conclusions.
| |
|
Existing commercial messaging systems have been designed with transactional applications in mind. IBM offers a series of such messaging products called WebSphere* MQ.5 In these systems, such as those made for financial transactions, reliability is the most important design goal. In consequence, they are inappropriate for applications that have stringent requirements of predictable low latency with high throughput.
Recently, IBM has developed a lightweight messaging protocol called MQ Telemetry Transport (MQTT6). MQTT is designed for use in sensor and actuator applications. While MQTT is much simpler than conventional MQ, enabling it to be implemented on very simple devices, it has no provision for bounding latency. IBM has also designed several innovative messaging systems. Gryphon7 uses a flow graph model to allow content-based messaging to scale to very large numbers of publishers and subscribers by distributing the filtering and aggregating functions over the distribution network. The Smart Middleware and Light Ends project (SMILE8) extends the Gryphon model by allowing the subscribers to make continuous queries (similar to Structured Query Language [SQL] queries) on the combined set of event streams coming from the publishers. Neither Gryphon nor SMILE is suitable for real-time applications.
Some attempts have been made to design real-time messaging systems. The Common Object Request Broker Architecture (CORBA**)9 offers an event service that supports a publish/subscribe event mechanism over the basic CORBA/IIOP** (Internet Inter-ORB Protocol) transport protocol. Real-time CORBA can be used to tune an upper bound on the time it takes to complete a distributed activity. It achieves this through the allocation of resources (e.g., thread priorities) across a distributed environment. The real-time CORBA Event Service,10 an extension to the CORBA event service, takes advantage of the real-time capabilities of CORBA and simplifies the communication protocol.
CORBA does not achieve our other design goals. It is a general-purpose distributed processing environment, not just a messaging system. This means that CORBA end systems have to support the complete set of APIs (application programming interfaces) required by the Object Management Group** (OMG**), a requirement which is not favorable to low-end devices. In addition, the requirement that standard CORBA protocols be used within the messaging service introduces unnecessary overhead. Furthermore, the many layers of software in CORBA make it difficult to analyze the performance of the resulting system. The need for an explicit event server supporting the publish/subscribe application necessitates an extra hop across which each message must be carried even if the publisher and subscriber are in the same broadcast or multicast domain. Finally, the use of explicit bindings in CORBA whereby publishers and subscribers explicitly connect and disconnect from the event server makes it unsuitable for environments where devices often fail and in which connectivity is often lost and reestablished.
Unlike CORBA, Data Distribution Service** (DDS**)11 is explicitly a publish/subscribe infrastructure. It does not define either the format of the messages transmitted over the network or the pattern of message exchange. DDS simply defines a set of APIs that are defined using CORBA IDL (Interface Definition Language). DDS implementations consequently do not necessarily interoperate with each other.
DDS offers some support for resource allocation. A publisher can specify the rate at which it wishes to publish and subscribers the rate at which they wish to receive messages, and DDS ensures their consistency. An end-to-end latency requirement may be associated with a message, but this is only a suggestion to the application, and DDS ignores it.
DDS has quite a rich set of operations describing different semantics for message delivery. In particular, DDS allows a data item to be associated with a key, thereby giving it an identity. Multiple updates for the same data may exist within DDS simultaneously, and the delivery semantics determine whether old ones are often overwritten by new ones or are retained. Data items can be retained until explicitly removed by subscribers or may expire after a given time period. The mechanisms described in the following sections could be used to build a DDS system.
| |
|
In a publish/subscribe system, publications concerning a topic are sent to all users who have subscribed to that topic. The publish/subscribe abstraction can be supported using a dedicated broker whose role is to dispatch published messages to the appropriate subscribers. Alternatively, the association between topics and subscribers can be maintained at the publishers, such that publishers send publications directly to subscribers. This peer-to-peer case incurs less latency because an indirection through a broker is avoided, but is only appropriate for moderate numbers of publishers and subscribers. For large numbers of publishers or subscribers, a large amount of state information needs to be maintained at the endpoints.
Tempo supports both modes of operation within the same publish/subscribe protocol, allowing the trade-off between latency and scalability to be determined by the user. A publisher periodically announces its ability to send messages on a named topic, and a subscriber periodically announces its readiness to receive messages on a named topic. Publishers and subscribers communicate across an IP-addressable channel. The role of the channel can be played by a broker, or by the broadcast or multicast domain. These two cases are transparent to the publishers and subscribers, and are configurable simply through an address—that is, in one case it is the address of the broker, in the other it is the broadcast or multicast address.
Publishers maintain the list of subscribers to their topic and send publications to these subscribers. Tempo uses a soft-state approach: if, after some time, a subscription announcement has not been received from a subscriber, the publisher removes the associated state for that subscriber entity and will not send any further messages to it. In the case of a broker, the broker acts as a proxy, appearing as a publisher to the subscribers and as a subscriber to the publishers. It aggregates information from multiple announcement messages, so that from the point of view of the endpoint, there appears to be only one subscriber or one publisher (i.e., the broker), although there may actually be many publishers and subscribers for a given topic.
Figure 1 illustrates the two modes of operation. The broker receives announcement messages from subscribers and relays them to the corresponding publishers and vice versa [Figure 1(A)]. This establishes the data path from publisher through the broker to the subscribers [Figure 1(C)]. The peer-to-peer mode uses the same approach. Here, announcement messages are sent on a broadcast or multicast domain [Figure 1(B)] and received by all endpoints. The publishers then establish the data path directly to the subscribers, as shown in Figure 1(D).
Figure 1
Due to the strict separation of control and data paths, the message format used on the data path can be kept very simple and therefore can be encoded and decoded efficiently. In particular, messages do not carry a topic name but only a four-byte message identifier. A distributed name service maps topic names to unique identifiers.
Large distributed games can be structured as publish/subscribe systems in which the interests of the players correspond to topics.11 In such a system, hundreds of thousands of players may be participating, so that the system potentially has to handle millions of messages per second. The acceptable end-to-end delay of distributed games is generally considered in be in the 150–200 ms range (occasional overruns are tolerable within the game.) This, coupled with the fact that the network is not controllable by the game organizer, means that an over-provisioning approach is typically adopted. Tempo is being used to support the communication needs of a large virtual world developed within IBM.
| |
|
Each of the elements in the publish/subscribe system (publisher, subscriber, and broker) contains the necessary infrastructure to have the messages delivered to the corresponding recipients (local or remote). Figure 2 shows an overview of this infrastructure: the messaging kernel. It is composed of queues associated with publishing applications, subscribing applications, and periodic timers. These queues are served by scheduling threads. In the case of a multi-core system, threads are distributed over the various cores. In this section, we present the details of this messaging kernel.
Figure 2
| |
|
The purpose of the scheduler is to allow an application to control the time allocated to different activities in the scope of the messaging system. Activities controlled by the scheduler are called tasks. Not all activities are controllable by the scheduler, for example garbage collection by the Java Virtual Machine (JVM). Any thread running outside the context of the messaging system is beyond the control of the scheduler. These need to be controlled by other means, for example by using a real-time garbage collector.12 Within the messaging system, a task entity is created each time there is a registration of a publication (by a publisher), a subscription (by a subscriber), or a function to be executed by a periodic timer. A task entity contains a queue into which events are placed. In the case of outgoing messages, the thread executing the application code places the events into the queue. In the case of subscriptions, network threads perform the task, whereas a dedicated timer thread places periodic callback (i.e., application-defined procedure) events into the appropriate queue. Within a given task, all events are dispatched in FIFO (first in, first out) order.
An application can allocate a share of the scheduling resources to a task. The share is expressed as a fraction of a given fixed time period T. That is, if a task is allocated a given fraction of the scheduling resources, it can run for at least that fraction of T every T time units. The share is used by the scheduler to determine which task should be scheduled next.
A weighted fair share (WFS) queuing discipline13 allows different service rates to be allocated to different queues according to some requested share. WFS was developed within the context of packet-switched networks to protect one network flow from another. It belongs to a set of virtual-time-based algorithms that provide fairness and delay bounds. Packet scheduling is distinct from event scheduling in that the time that an event takes to be serviced cannot be easily estimated (in a packet scheduler it is a simple function of the packet length). This means that the guarantees given by the event scheduler are necessarily less precise. Resources are shared among the event queues by allocating time slices called credits to their corresponding tasks within the scheduler. The scheduler always chooses to schedule the task with the largest number of credits that has a nonempty queue.
The credits of all tasks are reset when all tasks that have work to do have no credits left; i.e., the scheduler will always perform work if there is work to do. This property is commonly termed work-conserving. The total credits allocated to a task are a fraction of the time period T. For example, if T is 10 ms, the credits allocated to all topics sum to 10 ms. Each time a task is scheduled, the scheduler measures how long the scheduler thread runs before it completes. This amount is deducted from the total number of credits for that task. If some of the tasks with allocated credits have no work to do in a period, the duration of that period may be less than T, i.e., the amount of credits used before the credits are reset can be less than T.
We assume a non-preemptive threading model; i.e., a task may overrun its number of credits. This can only be recognized by the scheduler after the task has completed. We call such a task non-conformant. The entire system is conformant if it contains no non-conformant tasks. If the system is conformant, then no task can go for longer than a period T before being scheduled.
How a system reacts to non-conformance depends on the significance of missed deadlines. The system may raise an exception and carry out an investigation as to why a task is taking longer than expected (as in hard real-time systems); alternatively, the number of credits allocated to the non-conformant task may be reduced in the next time cycle. In the latter case, even if the system is non-conformant within time T, over a sufficiently long time, every task will still get its share of resources, as non-conformant tasks are penalized.
The share allocated to a task may not be sufficient to handle the total number of events that arrive for it—that is, the service rate may not be adequate to handle the arrival rate. The task is then called unstable. A task can be both conformant and unstable: in this case, the time it takes to handle an event is small compared with the period T, meaning that although the task never overruns, a backlog of events may build up.
By making the event queues of finite size, the effect of an unstable task on other tasks is minimized, since the event sender must eventually block, or events must be dropped, when the queue is full. A task whose event queue is frequently full is identified as an unstable task. While there is some probability of any arbitrarily large queue being full because of variations in service and arrival rate, the probability of this occurring can be analyzed if the expected service and arrival distributions are known. The system is stable if no task in the system is unstable. If the system is conformant and stable, no event will wait in a queue for longer than a period T before being scheduled.
The service rate of the queuing system is determined by the length of time that it takes to execute the event handler of the queue. For messages being published by the application, the event handler writes them to a socket. For messages being received from the network, the event handler is the subscriber's application-specific callback (i.e., the procedure invoked by this message). Ideally, these would have a constant rate, but in practice, they are variable. To reduce the variance, applications should perform as little processing as possible in the callback. In particular, they should not perform blocking operations (i.e., operations that do not complete until the operation either succeeds or fails).
For a messaging system running by itself on a single-processor system, the average service rate of that system is μ. (The influence of a multiprocessor on shares is considered in the section “Scalability for multi-core architectures” and following.) If there are N tasks and Si is the share allocated to task i, then the guaranteed service rate of the jth task is given by
As the messaging system comprises a set of distinct queues, one for each topic, a simple analysis of the expected delays can be performed if the distributions of the service and message arrival rates are known. For example, assuming that the inter-arrival and service times are exponentially distributed such that μj and λj are respectively the average service and arrival rate for task j, the intensity of the task is given by ρj = λj /μj. The mean response time in the event system, i.e., the average time it takes to be serviced, is given by14
More generally, the distributions of arrival and service times are not known a priori and even if they are, they are generally not as well behaved as a Poisson process (i.e., a function that describes the number of random events in an interval of time or space). In these cases, calibration is used. For example, the average measured length of an event queue and the average measured arrival rate allow us to determine the waiting time in the queue from Little's law.15 Increasing the share allocated to the event queue will reduce the average waiting time, but in ways that depend on the arrival and service distribution and cannot be predicted unless these are known. The share allocated to a task may be increased until an adequate average waiting time is achieved or it is determined that the configuration is not feasible.
| |
|
The scheduling function can be distributed over multiple different processors to allow the throughput to scale. The threads that execute the scheduling function share their states to maintain the schedule. Although it would be possible to use locks to synchronize these states, this would limit the scalability when the time spent contending for locking was high relative to the time needed for scheduling the task. For this reason, we use a lock-free approach in which multiple scheduling threads and writing threads can access the shared data structure without synchronization.
| |
|
A task in Tempo is associated with an event queue. When a task is scheduled, a single event from this event queue is processed. The event queues are implemented as Michael-Scott16 lock-free FIFO queues. These queues allow readers and writers to concurrently access the same queue without the need for locking. While the algorithm described in Reference 16 may cause a write operation to fail if there is not enough room in the queue, by extending the algorithm trivially we can ensure that a write operation is blocked if the queue is full and then is awakened by the next reader.
Events either arrive over the network or are produced by an application. In both cases, a writer thread puts the events into the event queue. A task with available events is scheduled according to its share and a scheduler thread calls the event-processing routine of the task. A task remains schedulable as long as there are events in its queue. Writing threads and scheduler threads communicate through a “SchedulableTasks” data structure, which is described in detail in the next section. A writer thread adds a task to the SchedulableTasks structure after it has put the first event into the event queue of the task. At this point, the task is schedulable. Eventually, a scheduling thread removes the task from the SchedulableTasks data structure, reads its event queue, dispatches the event, updates the number of credits, and, if the queue is non-empty, places it back into the SchedulableTasks structure. Thus, while many scheduling threads can execute in parallel, at any given moment a given task is assigned to at most one scheduling thread. However, because of the lock-free nature of the task's event queue, a writer thread and a scheduler thread can operate concurrently on this queue.
All the information about the credits that a task has is maintained within the task itself. This information does not need to be protected from concurrent access because it is only updated by the thread that took the task out of SchedulableTasks.
Figure 3 shows the non-locking coordination that takes place between the writer and scheduler threads. In Figure 3(A), the writer thread recognizes that a task is not in SchedulableTasks and adds it, while simultaneously a scheduler thread accesses the same data structure and removes the task with the most credits. In Figure 3(B), the task is scheduled, and the writer thread adds events to the event queue while the scheduler thread removes the first one. In Figure 3(C), the scheduler thread puts the task back into SchedulableTasks after updating its credits, while the writer thread continues to put events in the queue of the task. In Figure 3(D), the scheduler thread has emptied the queue and does not return the task to SchedulableTasks; the next time the writer thread adds an event, it will notice this and perform the action described in Figure 3(A) again.
Figure 3
The scheduling thread removes the task from SchedulableTasks and returns it if there is still work to do. The writer thread needs to know whether the scheduler has returned the task to SchedulableTasks. This is enabled by a counter that maintains the number of events in the event queue of the task. This counter is incremented by the writer after having written an event and decremented by the scheduler thread after reading an event. The counter is based on the compare-and-swap (CAS) operation, which performs a comparison and a modification in one processor instruction. The use of the CAS operation allows the construction of atomic versions of the basic Java types, such that consistent concurrent modifications can be performed without locks.
The scheduler thread will only return the task to SchedulableTasks if the counter value is nonzero. A writer recognizes that a scheduler thread did not return a task to SchedulableTasks if the value before it succeeded in incrementing the atomic counter was zero.
| |
|
The SchedulableTasks data structure is a priority queue in which priority is determined by the number of credits a task has (i.e., more credits mean a higher priority). This structure is accessed concurrently by writing threads and scheduling threads. In consequence, it must be concurrently accessible and provide high performance at the same time. Quantization is performed by making discrete the total number of credits that a task can have, such that it falls into one of N buckets. Associated with each of these buckets is a lock-free FIFO queue. These buckets are placed in an array such that the bucket with the highest priority (i.e., the most credits) is at the extreme left and the priority reduces with each subsequent array element. The scheduler thread starts at the highest priority bucket and moves to the right until it finds a bucket with a nonempty queue. It then attempts to take the first task in this queue. If it succeeds, it dispatches this task; if not, it moves to the next element. If it reaches the end of the structure without finding work, it returns a null value. The precision of the share can be traded off against the efficiency of the system by increasing or decreasing the number of buckets.
The total number of operations required to identify the bucket into which a task should be added is constant, but the number of operations to find the first nonempty bucket increases linearly with the number of buckets N. The thread determines whether a bucket is nonempty by checking an atomic counter. The more buckets there are, the more accurate the scheduling decision, but more buckets also means a larger overhead in removing an element. Following the queuing model given in the subsection “Scheduling mechanism,” the time interval over which the shares are divided determines the maximum time that a task can go unscheduled, if the system is conformant. For example, if it is desired that no task go unscheduled for longer than 10 ms, the time period T must be set to 10 ms. If the system is both conformant and stable, no message can be delayed more than 10 ms. If it is desired to distinguish tasks at a time granularity of 100 microseconds, and T = 10 ms, we need 100 buckets. This in turn requires on average 50 integer comparisons each time a task is scheduled.
To reduce this overhead, buckets can be grouped into bucket groups of size M. An additional atomic counter is kept for the entire bucket group as well as for the individual buckets. The average number of comparisons is then Y = M/2 + N/(2M). The optimal value of M for a fixed N is the value at which the derivative of Y with respect to M is zero, which is given by M = . The expected number of operations is and the worst case is 2 . For example, for 100 buckets, we expect, on average, 10 additional integer comparisons for removing a task, and 20 in the worst case.
As elements are simultaneously being added and removed from the priority queue, it is not the case that the “get” operation always removes the task with the highest priority in the queue. For example, it is possible that another thread adds a task to a bucket which the scheduler thread has already passed over. The priority queue is sequentially consistent according to Lamport's definition in Reference 17. A detailed analysis is given in Reference 18.
| |
|
When no schedulable task has work, the scheduler resets the credits allocated to all tasks in SchedulableTasks. The scheduler recognizes that this has occurred when the task returned from SchedulableTasks has a credit of zero or less. The scheduler removes all tasks from SchedulableTasks, resets their credit as described in the subsection “Scheduling mechanism,” and puts them back. Multiple scheduler threads can identify the need for credit resetting and perform it in parallel. This is a consequence of the fact that a task can be removed by exactly one scheduler thread from the set and that all credit information about the task is contained within it.
However, it may be the case that one or more tasks with credit are currently being serviced by other scheduling threads when a given scheduling thread identifies that no current task in the SchedulableTasks has work. These tasks will then be reset and one or more of them scheduled, because of the work-conserving nature of the scheduler. A thread that was servicing a task while the credits were being reset by another thread needs to recognize that this has occurred and update the credit of this task before putting it back into SchedulableTasks. This is achieved using an atomic counter that is incremented after each reset. Each thread keeps the value of this counter before getting a task and checks to see whether it has not changed before it puts the task back.
| |
|
Although the reading and writing of tasks to and from SchedulableTasks is lock-free, if a scheduling thread finds nothing in the priority queue, it sleeps until it is awakened by a writer thread. The wait call that the scheduler thread performs is executed in a loop whose condition is the emptiness of SchedulableTasks. This loop must be synchronized with respect to SchedulableTasks, and with the notification performed by the writer thread; otherwise notifications might be lost.
The behavior of a thread when it fails to obtain a lock is JVM-dependent. On some JVMs, the thread immediately suspends itself, while on others it may wait (“spin-wait”) for some time before suspending itself. The latter is preferable if the lock is not held for a long time. However, on both the IBM and Sun Microsystems JVM 5.0 and 6.0, we observe the former behavior; this may be because the implementation is not appropriate for our timing requirements. This has a major influence on performance on a multiprocessor machine, as many scheduler threads compete to obtain the lock to perform the test and all those that fail are suspended, although SchedulableTasks contains enough work for all of them to do. This unnecessary context switching of threads is extremely detrimental to performance.
To mitigate this problem in the absence of support for spin-waiting in the JVM, we perform the emptiness test a number of times without synchronization before executing the synchronized loop. The number of times the call is unsuccessful before the thread suspends itself is calibrated through a measure of the number of times it has successfully obtained work in the recent past. In this way, throughput is significantly incremented.
| |
|
In the section “Scalability for multi-core architectures,” we described how resources are divided among applications by the allocation of shares to the scheduler threads that read from the associated event queues. In this subsection, we consider the writer threads in the context of this schedule. There are two types of writer threads: those executing application code and those managing sockets.
Application code can be executed in the context of a periodic timer. For example, a sensing application requests a function to be executed periodically. This function takes a reading and publishes the result using the messaging system. The time it takes to execute this function is subtracted from the credit allocated to the corresponding task.
Applications are free to create their own threads, and these threads may then write to the event queue of a task. In this case, the scheduler can only indirectly control the writer thread by forcing it to block when the associated event queue is full. The extent to which the task's share is respected in the actual system behavior depends on the amount of CPU utilization that is spent within the messaging system compared with that done in the application. For example, if there were two application threads, each publishing to a different topic, we would not expect the observed throughput to reflect any particular share if only a small percentage of the CPU is spent within the messaging system. In this case, the application must use other means, e.g., real-time priorities, to determine how to share resources appropriately between application threads.
In the case of the network, we extend the model described in the section “Coordination between writer threads and scheduler threads,” such that the event queue is not a Java queue, but the kernel socket buffer. The writer then has two components: the software interrupt that writes to a socket and a Java thread that recognizes that it is has done so. This is achieved within Java by associating the task with a Java New I/O (NIO) channel in non-blocking mode. All such channels are monitored by a monitoring thread using the “NIO select” call. The schema remains the same as described in the section “Coordination between writer threads and scheduler threads,” except that the scheduler thread recognizes that a queue is no longer readable when the queue returns a null value.
| |
|
The scheduler guarantees that when a scheduling thread executes, it will choose the available task that best fits the schedule (based on the priority queue behavior described previously). If we have a single-processor machine in which the scheduling thread is the only thread that runs, all tasks will always be available and the share allocated to the task will simply be a fraction of the CPU utilization.
In a real system, there are many other threads running. Within the messaging system alone, we have timer threads, threads monitoring the I/O, and threads supporting the control part of the Tempo protocol. The JVM itself typically runs several daemon threads, e.g., for garbage collecting. All of this is in addition to everything else that runs on the machine, e.g., the application. In short, the scheduling threads are in reality not always running, and the share allocated to a task on a single-processor machine is a share of the fraction of the time the scheduler runs.
The situation is more complicated on a multiprocessor machine, because when a scheduling thread is serving a task, that task is unavailable to other scheduling threads. This is simply a consequence of the fact that a task is allocated to at most one scheduler thread at any given moment.
We assume that the probability P that a scheduler thread is scheduled by the OS, is independent, and that there are N processors and N scheduler threads in the system. If the OS supports a real-time scheduler, the value of P can be controlled by the OS. For example, if scheduling threads are given higher priorities than all other threads, then P = 1. We consider the general case.
If P = 1, no task can ever get more than 1/N of the total time that the scheduler runs. For example, if N = 1, the maximum share is 100 percent, if N = 2, the maximum is 50 percent, etc. More generally, the probability of there being k scheduler threads running simultaneously is given by the binomial distribution, whose expected value is N · P. Hence, we expect the number of processors on which scheduler threads run in parallel to be simply the product of the number of processors and their probability of being scheduled. The maximum percentage of total scheduling time that any task can have is then given by
For fixed N this approaches 100 percent as P decreases, and for fixed P this approaches 1/N as N increases. Note that the scheduler is not fair in the sense used in Reference 13—that is, it is possible for one task to get more than it requested while another gets less than it requested. To make the scheduler both fair and work-conserving, it would be necessary to allow multiple scheduler threads to dispatch events from the same event queue simultaneously. This would complicate the scheduler, but more importantly, it would mean that FIFO delivery of events within a given task is no longer guaranteed by the scheduler.
| |
|
In all of the tests we performed, the following configuration was used unless otherwise stated. The time period over which the shares are valid was set to 10 ms. The number of buckets used in the priority queue was set to 100. We used a number of scheduling threads equal to the number of cores on the machines. The communication between machines was implemented with a Gigabit Ethernet network, and we used TCP (Transmission Control Protocol) for the transport layer, with the socket size set to 128 Kbytes. The message size was 128 bytes. All machines ran a 2.6 Linux** Kernel with Java 6. The machine (an IBM HS-21 blade) on which the subscribers ran had two 2.3-GHz, 4-core processors. All of the publishing machines (IBM HS-20 blades) had two 3.0-GHz, hyper-threading processors.
| |
|
We measured the maximum number of messages per second we were able to send between a single publisher and a single subscriber. We also measured the effect on the average delay of increasing the sending rate. To avoid having to synchronize clocks on distinct machines, the end-to-end delay was measured by marking a random sample of the sent messages at the publishing application and having the subscribing application echo these packets back to the publisher over UDP (User Datagram Protocol). This method overestimates the end-to-end delay for low-sending rates (less than 1000 messages per second), because the additional network latency is a significant fraction of the total delay, but is a good approximation for higher rates, where the network latency plays a lesser role.
Figure 4 shows the evolution of the effective throughput and the end-to-end delay as a function of the publishing rate. The system sustains a rate of 120,000 messages per second (msg/s). The average end-to-end delay rises from below 1 ms for 1000 messages per second to slightly above 10 ms at 120,000 msg/s. Above this figure the system is no longer sustainable, and the average delay rises dramatically.
Figure 4
By way of reference, we compare this with the figures for the real-time C++ implementation of CORBA-TAO**10 on the same platform as described here. TAO could not sustain throughput above 4,000 msg/s. We also ran Tempo on a version of RTSJ with the Metronome garbage collector enabled, and found that we could not achieve more than 20,000 msg/s. We believe this is due to the overhead imposed by the dual memory model of RTSJ.
Figure 5 shows the delay distribution at 10,000 and 120,000 messages per second. The number of messages is represented with a logarithmic scale. We observed that the 95th percentile is at 1 ms for 10,000 msg/s and increases to 13.6 ms for 120,000 msg/s. At 10,000 msg/s, we observe some outlying data points just below 5 ms, while they climb to 120 ms at 120,000 msg/s.
Figure 5
| |
|
We measured how the messaging system behaved with an increasing number of topics. We modeled the performance for a set of subscribers within a single JVM. Each subscriber subscribed to a distinct topic. We simulated each publisher on a different machine. Each publisher attempted to send at the same fixed message rate. Figure 6(A) shows how the cumulative throughput at the subscribers evolved as the number of publishers increased for a per-publisher sending rate of 70,000 msg/s. It exhibits almost linear scaling with an increasing number of topics up to the number of cores on the machine, demonstrating that topics can be serviced in parallel. The measured average delay behaved similarly to that reported in the subsection “Bandwidth and delay for one publisher and one subscriber,” with an average delay of less than 50 ms for a cumulative throughput of more than 500,000 msg/s.
Figure 6
An interesting effect was observed when we increased the fixed sending rate to 80,000 msg/s [see Figure 6(B)] and 90,000 msg/s [see Figure 6(C)]. The cumulative throughput initially increased linearly and then collapsed, meaning for example that the total throughput for eight publishers at 90,000 msg/s was significantly less than that achieved at 70,000 msg/s. Our initial thought was that this was due to message loss within the Linux kernel or the Ethernet switch causing the TCP throughput to collapse; however we observed no TCP retransmissions. We currently assume that this is an artifact of the Linux kernel when subjected to high interrupt rates, but have not been able to verify this. Wu et al.19 give an analysis of the behavior of the 2.6 Linux kernel for high-performance I/O-intensive computation. An in-depth study of these phenomena is beyond the scope of this paper.
| |
|
We have shown how isolating topics through the allocation of a per-topic queue and an effective per-topic queuing server allows a messaging system to be modeled as multiple independent queuing systems. This facilitates analysis, as it allows estimates to be made on the delays in the system under various conditions.
The per-topic server is achieved through use of a time-based credit scheduling mechanism that allocates a share of the total service time to a topic. Building such a scheduling mechanism on a multi-core system is challenging, as the threads that comprise the scheduler must coordinate to ensure that the correct shares are allocated. In a conventional locked-based implementation, the synchronization overhead is prohibitive. We have shown that the use of lock-free data structures enables our system to scale on multi-core systems without compromising performance. While the high throughput and low latency we have demonstrated are in part a consequence of the simplicity of the Tempo messaging system, the underlying principles are also applicable to more sophisticated systems.
Tempo is written in a general-purpose programming language and makes no assumptions about real-time support from the OS. We have shown how it can be integrated into a complete real-time system to achieve bounded delay or used on a non-real-time system to achieve low delay.
*Trademark, service mark, or registered trademark of International Business Machines Corporation in the United States, other countries, or both.
**Trademark, service mark, or registered trademark of Sun Microsystems, Inc., the Object Management Group, Inc., Linus Torvalds, or Douglas C. Schmidt in the United States, other countries, or both.
| |
|
Accepted for publication November 15, 2007; Published online May 14, 2008.
|
|