DEBUGGING REAL-TIME SYSTEMS

Modular and incremental development is the key

This article contains the following executables: REAL.ARC

Gurjot Singh, Moses Joseph, and Dave Barnett

The authors all work for Lynx Real-Time Systems, where Gurjot is director of product planning, Moses is vice president of marketing, and David is manager of technical support. They can be contacted at 16780 Lark Ave., Los Gatos, CA 95030.


With some systems, missing 1 out of 10,000 samples may be quite acceptable. When real-time, mission-critical data acquisition is involved, however, missing this seemingly statistically insignificant sample renders the system unreliable. The question that arises is how you build real-time systems that logically perform the functions they're designed to and still meet the deadlines of the time-critical tasks.

Modular and incremental development and debugging is one solution to this problem. In this article, we'll use a simulated data-acquisition system to describe this process. While the example has been simplified, the general principles extend to more complex systems.

In the example, we used the LynxOS operating system, a 33-MHz 80486 PC clone, and the DCC20A timer card from Industrial Computer Source (San Diego, California) to create a self-profiling, real-time system. The system includes a driver and a test application. We chose the DCC20A timer card because it allowed us to generate programmable synchronous interrupts to simulate a continuous synchronous real-time application, and it enabled us to profile our real-time system.

The application was written to conform to real-time POSIX specifications (POSIX 1003.4 and POSIX 1003.4a) and can be ported to any real-time POSIX-compliant operating system. (POSIX.1, POSIX.4, and POSIX.4a provide a standard Application Programming Interface (API) for real-time implementations. POSIX.4 is at draft 12 and POSIX.4a is at draft 6. Both are likely to be ratified in the near future.) The device driver is the only part of the system specific to LynxOS.

A diagram of the data-acquisition system is shown in Figure 1. It has a synchronous interrupting device, one producer task, and two consumer tasks. We used two timers from the DCC20A timer card: Timer 5 to generate the synchronous interrupts and Timer 1 as a count-down timer. Timer 5 is programmed to reset Timer 1 at every interrupt. This allowed us to use Timer 1 to profile the various tasks. The interrupt service routine (ISR) for Timer 5 signals the producer task when the interrupt occurs. The producer task records the task response-time data and signals the consumer tasks to read the times out of the shared data. The consumer tasks execute and signal the producer when they are done. The cycle then starts all over again at the next interrupt. The objective is to ensure that all processing for a single pass be done before the next interrupt arrives and that the data integrity is preserved for the shared data. Mutexes and condition variables are used to ensure data integrity and to synchronize access to the data. The overall flow is shown in Figure 2.

In an ideal situation with no other interrupting devices, you'd expect to see the sequence of events shown in Figure 3. In this case, when Timer 5 generates an interrupt, the operating system invokes the ISR. After the ISR has executed, the producer task is scheduled and run. Finally the consumer tasks execute and the system waits for the next interrupt to occur. However, in the real world you can have delays or "blocks" due to interrupts from other devices or temporarily disabled preemption. These delays can impact the system's overall performance.

Phase 1: Writing the Skeleton Application

During this initial phase, we created the skeleton application (see Listing One, page 116) which includes the producer task and the two consumer tasks. The detailed implementation of the producer and consumer modules (used for the final testing and profiling) was completed later. Because of space restrictions, this code is only available electronically. The C routines for each task are simple and have profiling points built into them. The emphasis is on debugging concepts rather than solving a specific problem.

The application consists of four modules: simulate.c, the main module; producer.c, the producer; display.c, one of the consumers; and synch.c, the synchronization module.

The simulate module sets up an array of consumers and then creates a thread for each one. Each consumer runs at a priority 1 less than the producer's priority. The main program takes the name of the timer_device as an argument. It then sets the priority of the producer to 17 and initializes the synchronization routines. At this point the main or root thread becomes the producer thread.

The producer module waits for a signal from the ISR. For the debugging phase, this is #defined to sleep(5). Notice that the record_trt() and record_tct() are not used during the first phase. When an interrupt is received, the producer signals the consumers that the data is present and then waits for them to finish.

Consumer1 and Consumer2 are identical in functionality (although only Consumer1 is shown in the display module). They wait for a signal from the producer, complete their tasks, and then signal the producer when they are done. We've left out the actual details for each consumer for the initial testing.

The synch module is the most important module in the first phase. This is where the synchronization between the producer and consumers is done. Init_data allocates a mutex and two condition variables. It also initializes an array of integers (used as Boolean values) to ensure a single access to each filled buffer. A 0 value signifies that the consumer has not accessed the buffer. Wait_for_consumers (called by the producer) simply blocks until all the readers have finished. Signal_consumers resets the done_flags and broadcasts to the consumers to perform their tasks. Signal_producer increments the readers_done count when another consumer is done. If all the consumers are done, it signals the producer with the write_cond. It also sets the done_flag to 1 to prevent this consumer from reading again. Wait_for_producer prevents the consumer from getting the same buffer by checking the value of the done_flag.

Phase 2: Check Initialization and Synchronization

During this phase, we compiled and linked the application modules using Gcc (GNU's C compiler) with the -g option that creates debug information for the debugger. We used Ldb (Lynx's debugger) to load and debug the resulting executable code. Our goal in debugging these modules was to ensure that the initialization was working correctly and that the synchronization mechanism we implemented worked as desired.

We set the following break points in Ldb: producer.c:8 (BP1)-start of producer; display.c:7 (BP2)-start of consumer 1; and record.c:7 (BP3)-start of consumer 2. We stepped through the simulate module and stopped at BP1. At this point you see that the main thread (producer) is running, and the two consumers are created but not active. Since Lynx's Ldb is a multithreaded debugger, we could see that three threads had been created but only the main thread, P1, was active; see Figure 4. If you click on GO again, the two consumers generate a race condition trying to get CPU time, but we don't really care which runs first. In this case we hit BP3 before BP2.

Slow-speed Simulation

We had #defined wait_for_interrupt to sleep(5). At this point, we had not yet written the device driver to generate the interrupts, so we used sleep(5) to simulate the generation of the interrupts. Ldb has a feature that informs us whenever the SIGALRM occurs at the completion of the sleep; see Figure 5, where we enable this condition. This allowed us to make sure that our application ran periodically and that for every interrupt, the producer ran first, followed by the two consumers, each of which ran only once. In fact, we discovered a bug in the synch.c module when we ran the program using Ldb. We found that for the first interrupt only, the consumers ran before the producer had signaled them. This was because we had initialized the done_flag to 0 instead of 1. Ldb helped us track that down within the first few minutes of our debugging process. Notice that we first debugged the skeleton application. Trivial as it may sound, it's a good practice to do so before completing the final application. If you wait until examining the final code, a synchronization or an initialization bug is much harder to track down.

The kinds of bugs you encounter may be different, but the debugging process is similar. If you don't have a multithreaded debugger, then you'll probably use the age-old method of embedding printf statements to debug the application.

Phase 3: Driver and Application Completion

Once we had the synchronization mechanism debugged, we completed the rest of our application and the driver that generates the interrupts. Some application modules were modified so we could record the minimum and maximum values for the parameters shown in Figure 3. As previously mentioned, the code for the complete system is available electronically. Also provided electronically are detailed programmer's notes that describe the programs and relevant modules.

Phase 4: Integration, Testing, and Profiling

After completing the application, we used Ldb to test it with the LynxOS-specific written driver. Again, we set up the timer to generate interrupts every minute so we could step through our producer and consumers to guarantee that everything ran as expected. We also simulated a failure by deliberately missing an interrupt.

An important goal in the debugging process of a real-time application is to make sure that the system performs to its specifications and meets the various response times described in the first part of this article. After we integrated the device driver and application, we ran the application with many different interrupt periods. The display module output for two of these periods is shown in Figures 6(a) and 6(b) (page 116), which list the minimum and maximum values during five-second intervals. (The column headings for Figure 6 are: IRT, interrupt response time; IST, interrupt service time; INT, interrupt handling time [IRT+IST]--these are the best and worst case [ IRT + IST] numbers combined, that is, the best case INT is not the sum of the best case of IRT and the best case of 1ST individually; SYS, system overhead scheduling, context switch, any other interrupts; TRT, task response time.) In each category, the left-hand column is the minimum and the right-hand column is the maximum time in microseconds. The results were the minimums and maximums for 8333 iterations for the 600-microsecond interval interrupt and 11,111 iterations for the 450-microsecond interval interrupt.

Figure 6(a) shows no overruns while 6(b), with its 450-microsecond period, shows several. The overruns indicate that we had not completed the consumer tasks before the next interrupt occurred--the period is less than the total completion time (TRT+TCT). To find out whether the problem was in the application code, driver routine, or in kernel overhead, we modified the application to print each iteration instead of accumulating results for five seconds; Figure 6(c) shows the results, in this case a small sample of the results accumulated. We were looking for the overrun and the case prior to it. In our system, we have three other interrupting devices: the disk interrupt, keyboard interrupt, and system clock. The single-iteration listing shows that SYS (case 1), IRT (case 2), or 1ST (case 3) durations are longer than the minimum times for each category in Figure 6(a). This is probably due to a higher-priority interrupt from one of the three aforementioned devices, but the overall variations are within an expected range. This implies that we probably have a long task completion time (TCT), which is causing us to miss the next interrupt. When we raised the interrupt cycle to 600 microseconds, we found that the overruns dropped to 0. In fact, if we were to do a rigorous worst-case analysis, we would probably discover that the worst cycle is between 450 and 600 microseconds.

Summary

We have stepped through a typical development and debugging cycle of a simple, but representative, real-time application. The example demonstrates how to develop and debug the application in a systematic and incremental way, taking advantage of a powerful but user-friendly multithreaded debugger like Ldb. Since deterministic performance is the most critical issue in real time, we focused on profiling the real-time application and verifying correctness in terms of worst-case performance. In the real world, you can expect to find more complex situations, but the principles demonstrated here can be extended, and in fact are even more important. Since every major real-time vendor is promising to support the real-time POSIX programming interfaces, we avoided using any proprietary LynxOS calls so that the example should be applicable on any system where the POSIX-compatible versions are available.



_DEBUGGING REAL-TIME SYSTEMS_
by Gurjot Singh, Moses Joseph, and Dave Barnett


[LISTING ONE]


/*  simulate.c -- Simulate a real-time system and measure and/or record.
**     1. Device interrupt response time; 2. Device driver interrupt service
**     time; 3. Task response time
** Each measured time is exagerated by a constant amount of time equal to the
** length of time it takes to make the measurement.
*/

#include <stdio.h>
#include <pthread.h>

#define PRODUCER_PRIO 17

extern void producer();
extern void display();
extern void record();

struct {
    void (*f)();    /* task entry point                                */
    int p_bias; /* priority relative to producer (always negative) */
} consumers = {
    { display, -1 },
    { record, -1 }
};
#define NCONSUMERS (sizeof consumers / sizeof consumers[0])
main(argc, argv)
int argc;
char *argv[];
{
    int i;
    if (argc != 2) {
        fprintf(stderr, "Usage: %s timer_device\n", argv[0]);
        exit(1);
    }
    init_data(NCONSUMERS);
    for (i = 0; i < NCONSUMERS; i++) {
          pthread_attr_t attr;
          pthread_t tid;

          pthread_attr_create(&attr);
          pthread_attr_setinheritsched(&attr, PTHREAD_DEFAULT_SCHED);
          pthread_attr_setprio(&attr, PRODUCER_PRIO + consumers[i].p_bias);
          if (pthread_create(&tid, attr, consumers[i].f, i) == -1) {
            perror("pthread_create");
            exit(1);
          }
    }
    init_timer(argv[1]);
    producer();
    exit(0);
}
/* producer.c */
#define wait_for_interrupt() sleep(5)
#define record_trt()
#define record_tct()

void producer()
{
    for (;;) {
        wait_for_interrupt();
        record_trt();
        signal_consumers();
        wait_for_consumers();
        record_tct();
    }
}
/* consumer1 */
void display(id)
int id;
{
        for (;;) {
        wait_for_producer(id);
        signal_producer(id);
    }
}
/* synch.c */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

static pthread_mutex_t mutex;
static pthread_cond_t write_cond; /* O.K. to write */
static pthread_cond_t read_cond; /* O.K. to read */
static int readers_done;
int *done;
static int num_readers;

void init_data(readers)
int readers;
{
    int i;
    if (pthread_mutex_init(&mutex, pthread_mutexattr_default) == -1) {
        perror("pthread_mutex_init");
        exit(1);
    }
    if (pthread_cond_init(&write_cond, pthread_condattr_default) == -1) {
        perror("pthread_cond_init");
        exit(1);
    }
    if (pthread_cond_init(&read_cond, pthread_condattr_default) == -1) {
        perror("pthread_cond_init");
        exit(1);
    }
    num_readers = readers;
    readers_done = num_readers;
    if (!(done = (int *)malloc(num_readers * sizeof(int)))) {
        perror("malloc");
        exit(1);
    }
    for (i = 0; i < num_readers; i++) done[i] = 1;
}
void wait_for_consumers()
{
    pthread_mutex_lock(&mutex);
    if (readers_done != num_readers) {
        pthread_cond_wait(&write_cond, &mutex);
    }
    pthread_mutex_unlock(&mutex);
}
void signal_consumers()
{
    int i;
    pthread_mutex_lock(&mutex);
    readers_done = 0;
    for (i = 0; i < num_readers; i++) done[i] = 0;
    pthread_cond_broadcast(&read_cond);
    pthread_mutex_unlock(&mutex);
}
void signal_producer(id)
int id;
{
    pthread_mutex_lock(&mutex);
    readers_done++;
    if (readers_done == num_readers) {
        pthread_cond_signal(&write_cond);
    }
    done[id]  = 1;
    pthread_mutex_unlock(&mutex);
}
void wait_for_producer(id)
int id;
{
    pthread_mutex_lock(&mutex);
    if (done[id]) pthread_cond_wait(&read_cond, &mutex);
    pthread_mutex_unlock(&mutex);
}







Copyright © 1992, Dr. Dobb's Journal