Dealing with concurrency in Node.js

Reading Time: 4 minutes

Even though the Event Loop is a single thread we have to take care of race condition since 99% of our code will run in a non-main thread.
Callbacks and Promises are a good example of it. There are many resources along with World Wide Web about how Event Loop works like this one, so the idea of this post is to assume that we could have a resource in our code that could be accessed (read and write) by multiple threads.

Here we have a small snippet that shows how to deal with a race condition. A common scenario is when we cache some data that was expensive to get in terms of CPU, network, file system or DB.

Implementation

We might implement a cache in multiple ways. A simple way is an in-memory collection; in this case, a Map. The structure of our collection can also be a List, that will depend on our requirements.

Our Map holds users and we use the User ID as the Key and the User itself (through a Promise) the Value. That way, a method getUserById will be very fast: O(1).

I’ll explain step by step but at the end of this post you have the full source code

So let start by our map

      
const cache = new Map();
    

Our Map won’t be so smart in this example, it won’t expire elements after a while and it will add as many elements as available memory we have. An advanced solution is to add this kind of logic to avoid performance issues. Also, it will be empty after our server restarts, so is not persistent.

Let’s create a collection of users that simulate our DB

      
const users = [];
function createSomeUsers() {
    for (let i = 0; i < 10; i++) {
        const user = {
            id: i,
            name: 'user' + 1
        };
        users.push(user);
    }
}
 

The main method that we want to take care of race condition

      
function getUserFromDB(userId) {
    let userPromise = cache.get(userId);
    if (typeof userPromise === 'undefined') {
        console.info('Loading ' + userId + ' user from DB...');//IT SHOULD BE executed only once for each user
        userPromise = new Promise(function (resolve, reject) {
            //setTimeout will be our executeDBQuery
            const threeSeconds = 1000 * 3;
            setTimeout(() => {
                const user = users[userId];
                resolve(user);
            }, threeSeconds);
        });
        //add the user from DB to our cache
        cache.set(userId, userPromise);
    }
    return userPromise;
}
 

To test our race condition we’ll need to create multiple callbacks that simulate a heavy operation.
That simulation will be made with the classic setTimeout that will appear later.

      
  function getRandomTime() {
      return Math.round(Math.random() * 1000);
  }
 

Finally the method that simulates the race condition

      
function executeRace() {
    const userId = 3;
    //get the user #3 10 times to test race condition
    for (let i = 0; i < 10; i++) {
        setTimeout(() => {
            getUserFromDB(userId).then((user) => {
                console.log('[Thread ' + i + ']User result. ID: ' + user.id + ' NAME: ' + user.name);
            }).catch((err) => {
                console.log(err);
            });
        }, getRandomTime());
        console.info('Thread ' + i + ' created');
    }
}
 

Our last step: call our methods to create some users and to execute the race condition

      
    createSomeUsers();
    executeRace();
 

Let create a file called race_condition.js and execute it like this:

      
    node race_condition.js
 

The output will be:

      
    Dummy users created
    Thread 0 created
    Thread 1 created
    Thread 2 created
    Thread 3 created
    Thread 4 created
    Thread 5 created
    Thread 6 created
    Thread 7 created
    Thread 8 created
    Thread 9 created
    Loading 3 user from DB...
    [Thread 8]User result. ID: 3 NAME: user1
    [Thread 3]User result. ID: 3 NAME: user1
    [Thread 1]User result. ID: 3 NAME: user1
    [Thread 9]User result. ID: 3 NAME: user1
    [Thread 5]User result. ID: 3 NAME: user1
    [Thread 2]User result. ID: 3 NAME: user1
    [Thread 7]User result. ID: 3 NAME: user1
    [Thread 0]User result. ID: 3 NAME: user1
    [Thread 6]User result. ID: 3 NAME: user1
    [Thread 4]User result. ID: 3 NAME: user1
 

Notice that [Thread X] output does not appear in order. That’s because of our random time tat simulate a thread that takes time to be resolved.

Full source code

      
/**
 * A cache implemented with a map collection
 * key: userId. 
 * value: a Promise that can be pending, resolved or rejected. The result of that promise is a user
 * IMPORTANT: 
 *  - This cache has not a max size and a TTL so will grow up indefinitely
 *  - This cache will be reset every time the script restarts. We could use Redis to avoid this
 */
const cache = new Map();
/**
 * Our collection that will simulate our DB
 */
const users = [];
/**
 * 
 */
function createSomeUsers() {
    for (let i = 0; i < 10; i++) {
        const user = {
            id: i,
            name: 'user' + 1
        };
        users.push(user);
    }
    console.info('Dummy users created');
}
 
 
/**
 * 
 * @param {int} userId 
 * @returns Promise<User>
 */
function getUserFromDB(userId) {
    let userPromise = cache.get(userId);
    if (typeof userPromise === 'undefined') {
        console.info('Loading ' + userId + ' user from DB...');//SHOULD BE executed only once for each user
        userPromise = new Promise(function (resolve, reject) {
            //setTimeout will be our executeDBQuery
            const threeSeconds = 1000 * 3;
            setTimeout(() => {
                const user = users[userId];
                resolve(user);
            }, threeSeconds);
        });
        //add the user from DB to our cache
        cache.set(userId, userPromise);
    }
    return userPromise;
}
 
/**
 * @returns a number between 0 and 1000 milliseconds
 */
function getRandomTime() {
    return Math.round(Math.random() * 1000);
}
 
/**
 * 
 */
function executeRace() {
    const userId = 3;
    //get the user #3 10 times to test race condition
    for (let i = 0; i < 10; i++) {
        setTimeout(() => {
            getUserFromDB(userId).then((user) => {
                console.log('[Thread ' + i + ']User result. ID: ' + user.id + ' NAME: ' + user.name);
            }).catch((err) => {
                console.log(err);
            });
        }, getRandomTime());
        console.info('Thread ' + i + ' created');
    }
}
 
createSomeUsers();
executeRace();
    
    
Photo by Ryoji Iwata on Unsplash


About the author

Andrés Canavesi
Andrés Canavesi

Software Engineer with 15+ experience in software development, specialized in Salesforce, Java and Node.js.


Related posts


Leave a Reply

%d bloggers like this: