Skip to content

Incremental Data Reader

An Incremental Data Reader allows for the ability to run data readers that update incrementally rather than populate the full set of data, provided the data source (API or database) provides some pointer/date that makes it possible to track that information. Incremental data readers are based on checkpoints. A checkpoint could be a date time such as the last time the data reader ran, therefore each invocation would get a record set of any changed records using the timestamp as the checkpoint.

On first run the checkpoint value in the configuration for the data reader will be null. When a data reader completes successfully the configuration will persist with the value returned by GetCheckpoint() on the data reader. If no override implementation of GetCheckpoint() exists on the data reader then the value persisted will be a UTC datetime of when the data reader started.

Creating an Incremental Data Reader

When creating a data object via the CLI tooling you can indicate that the data reader that is generated is an incremental.

xchange data-object new --module-id app-1 --name Example --reader-type incremental

The data reader will be mostly the same as a full data reader but with the addition of a checkpoint logic. For example if you have an API that lets you pass in a cursor that will then return a record set of all the changes based on that cursor you could implement the data read as follows:

App/v1/Example/ExampleDataReader.cs
public class ExampleObjectDataReader : IncrementalDataReaderBase<ExampleObjectDataReader>
{
    private readonly ILogger<ExampleObjectDataReader> _logger;
    private readonly IHostContext _hostContext;
    private readonly IApiClient _apiClient;
    private readonly IncrementalCacheWriterConfig _config;
    private string? _checkpoint;

    public ExampleObjectDataReader(
        ILogger<ExampleObjectDataReader> logger,
        IHostContext hostContext,
        IApiClient client,
        AppV1CacheWriterConfig config)
    {
        _logger = logger;
        _hostContext = hostContext;
        _apiClient = client;
        _config = config.ExampleObjectConfig;
    }

    public override string GetCheckpoint()
    {
        return _checkpoint;
    }

    public override async IAsyncEnumerable<IDataReaderObject<ExampleDataObject>> GetTypedDataAsync(
        DataObjectCacheWriteArguments? dataObjectRunArguments, 
        [EnumeratorCancellation] CancellationToken cancellationToken) 
    {
        var response = new ApiResponse<IEnumerable<ExampleDataObject?>>();
        try
        {
            response = await _apiClient.GetExamples(_config.Checkpoint, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception exception)
        {
            _logger.LogError("API response: {Message}", exception.Message);
            throw;
        }

        if (response.Result != null && response.IsSuccessful)
        {
            foreach (var item in response.Result.Changes)
            {
                var resource = new ExampleDataObject()
                {
                    Age = item.Age,
                    Date = item.Date,
                    Name = item.Name,
                    Position = item.Position,
                    Id = item.Id,
                };
                yield return new DataReaderObject<ExampleDataObject>(resource, UpdateOperation.Upsert);
            }

            foreach (var item in response.Result.Deletes)
            {
                var resource = new ExampleDataObject()
                {
                    Age = item.Age,
                    Date = item.Date,
                    Name = item.Name,
                    Position = item.Position,
                    Id = item.Id,
                };
                yield return new DataReaderObject<ExampleDataObject>(resource, UpdateOperation.Delete);
            }
        }

        _checkpoint = response.Result.Cursor;
    }
}

Convert a Full Data Reader to Incremental

In the case that you have a data object already created but want to convert it to be read incrementally then this will take you through each of the steps to do so.

The example code below is converting a data object named Example and the module named App

Updating the Cache Writer Service Configuration

First we'll want to update the cache writer service's configuration to have the ExampleObjectConfig to be a different property type.

App/v1/AppV1CacheWriterConfig.cs
namespace Connector.App.v1;

[Title("App V1 Cache Writer Configuration")]
[Description("Configuration of the data object caches for the module.")]
public class AppV1CacheWriterConfig
{
    public HealthCheckConfig HealthCheckConfig { get; set; } = new();
    public IncrementalCacheWriterConfig ExampleObjectConfig { get; set; } = new();
}

Updating the Cache Writer Service Definition

The cache writer service definition is used for registering all of the data readers for a module. In the case of an incremental data reader we want to use the RegisterIncrementalDataReader method instead of the generic RegisterDataReader

Currently incremental data readers do not support change detection.

App/v1/AppV1CacheWriterDefinition.cs
public override void ConfigureService(ICacheWriterService service, AppV1CacheWriterConfig config)
{
    var dataReaderSettings = new DataReaderSettings
    {
        DisableDeletes = false,
        UseChangeDetection = false
    };
    service.RegisterDataReader<HealthCheckDataReader, HealthCheckDataObject>(ModuleId, config.HealthCheckConfig, dataReaderSettings);
    service.RegisterIncrementalDataReader<ExampleObjectDataReader, ExampleDataObject>(
        ModuleId, 
        config.TestWorkersConfig, 
        new DataReaderSettings 
        { 
            DisableDeletes = false, 
            UseChangeDetection = false
        });
}

Updating the Data Reader

The data reader for the Example data object now needs to be updated to implement IIncrementalDataReader. The SDK comes with IncrementalDataReaderBase which can be used instead.

The checkpoints for this data reader will be a date time with UTC as the timezone. Each time there is a successful run the data reader the checkpoint will automatically be set to the start time (in UTC) of the data reader.

App/v1/Example/ExampleDataReader.cs
public class ExampleObjectDataReader : IncrementalDataReaderBase<ExampleObjectDataReader>
{
    private readonly ILogger<ExampleObjectDataReader> _logger;
    private readonly IHostContext _hostContext;
    private readonly IApiClient _apiClient;
    private readonly IncrementalCacheWriterConfig _config;

    public ExampleObjectDataReader(
        ILogger<ExampleObjectDataReader> logger,
        IHostContext hostContext,
        IApiClient client,
        AppV1CacheWriterConfig config)
    {
        _logger = logger;
        _hostContext = hostContext;
        _apiClient = client;
        _config = config.ExampleObjectConfig;
    }

    public override async IAsyncEnumerable<IDataReaderObject<ExampleDataObject>> GetTypedDataAsync(
        DataObjectCacheWriteArguments? dataObjectRunArguments, 
        [EnumeratorCancellation] CancellationToken cancellationToken) 
    {
        DateTime.TryParse(_config.Checkpoint, out var modifiedSinceDateTime);
        var response = new ApiResponse<IEnumerable<ExampleDataObject?>>();
        try
        {
            response = await _apiClient.GetExamples(modifiedSinceDateTime, cancellationToken).ConfigureAwait(false);
        }
        catch (Exception exception)
        {
            _logger.LogError("API response: {Message}", exception.Message);
            throw;
        }

        if (response.Result != null && response.IsSuccessful)
        {
            foreach (var item in response.Result)
            {
                var resource = new ExampleDataObject()
                {
                    Age = item.Age,
                    Date = item.Date,
                    Name = item.Name,
                    Position = item.Position,
                    Id = item.Id,
                };
                yield return new DataReaderObject<ExampleDataObject>(resource, UpdateOperation.Upsert);
            }
        }
    }
}

If you want to customize the checkpoint value, then your reader can implement its own GetCheckpoint() method. You can see an example of that in Creating an Incremental Data Reader

Updating the API Client

The API client needs to be updated to support using the query parameter on the target API for getting the partial record set. An Example of API Client that has been updated to include the modifiedSinceDateTime could look something like this

Client/ApiClient.cs
public async Task<ApiResponse<IEnumerable<ExampleObjectDataReader?>>> GetExamples(DateTime modifiedSinceDate, CancellationToken cancellationToken = default)
{
    var queryParameter = new Dictionary<string, string>
    {
        { "date", modifiedSinceDate.ToString("yyyy-MM-dd") },
    };
    var queryString = QueryString.Create(queryParameter).ToString();
    var response = await _httpClient
        .GetAsync("api/example" + queryString.ToString(), cancellationToken)
        .ConfigureAwait(false);

    return new ApiResponse<IEnumerable<ExampleDataObject?>>()
    {
        IsSuccessful = response.IsSuccessStatusCode,
        StatusCode = (int)response.StatusCode,
        Result = response.IsSuccessStatusCode ? await response.Content.LoadFromJsonAsync<Enumerable<ExampleDataObject>> : default,
        RawResult = response.IsSuccessStatusCode ? null : await response.Content.ReadAsStreamAsync(),
    };
}