React Native + RxDB: как строить Local-First приложения. Архитектура и примеры


- Как устроена двусторонняя синхронизация между клиентом и сервером?
- Как работает обработка конфликтов?
- Что важно учитывать при разработке таких решений?
Стек технологий
- Клиент: React Native + react-native-nitro-sqlite + RxDB;
- Сервер: NestJS + TypeORM + PostgreSQL;
- React Native – фреймворк для кроссплатформенной разработки мобильных приложений, который позволяет создавать нативные интерфейсы для iOS и Android с использованием JavaScript и React;
- react-native-nitro-sqlite – библиотека для React Native, которая обеспечивает управление локальными базами данных SQLite на устройстве;
- RxDB – клиентская база данных, ориентированная на оффлайн-режим и реактивную работу с данными. Она хранит документы локально, позволяет выполнять запросы с подпиской на изменения и поддерживает автоматическую синхронизацию с удалённым сервером;
- NestJS – фреймворк для создания серверных приложений на Node.js;
- TypeORM – ORM-библиотека для TypeScript и JavaScript, которая обеспечивает удобную работу с реляционными базами данных;
- PostgreSQL – объектно-реляционная система управления базами данных.
Локальное хранилище RxDB через SQLite
import {
getRxStorageSQLiteTrial,
getSQLiteBasicsQuickSQLite,
} from 'rxdb/plugins/storage-sqlite';
import { open } from 'react-native-nitro-sqlite';
import { wrappedValidateAjvStorage } from 'rxdb/plugins/validate-ajv';
import { wrappedKeyEncryptionCryptoJsStorage } from 'rxdb/plugins/encryption-crypto-js';
const sqliteBasics = getSQLiteBasicsQuickSQLite(open);
const storage = getRxStorageSQLiteTrial({ sqliteBasics });
const validatedStorage = wrappedValidateAjvStorage({ storage });
const encryptedStorage = wrappedKeyEncryptionCryptoJsStorage({
storage: validatedStorage,
});
export { encryptedStorage };
Создание экземпляра базы данных
import { addRxPlugin, createRxDatabase, RxDatabase, WithDeleted } from 'rxdb';
import { RxDBDevModePlugin } from 'rxdb/plugins/dev-mode';
import { replicateRxCollection } from 'rxdb/plugins/replication';
import NetInfo from '@react-native-community/netinfo';
import {
CheckPointType,
MyDatabaseCollections,
ReplicateCollectionDto,
} from './types.ts';
import { encryptedStorage } from './storage.ts';
import { defaultConflictHandler } from './utills.ts';
import { usersApi, userSchema, UserType } from '../features/users';
import { RxDBMigrationSchemaPlugin } from 'rxdb/plugins/migration-schema';
import { RxDBQueryBuilderPlugin } from 'rxdb/plugins/query-builder';
import { RxDBUpdatePlugin } from 'rxdb/plugins/update';
// for support query.update method
addRxPlugin(RxDBUpdatePlugin);
// for support chained query methods
addRxPlugin(RxDBQueryBuilderPlugin);
// for enabling data migration
addRxPlugin(RxDBMigrationSchemaPlugin);
export class RxDatabaseManager {
private static instance: RxDatabaseManager;
private db: RxDatabase<MyDatabaseCollections> | null = null;
private isOnline = false;
private constructor() {}
public static getInstance(): RxDatabaseManager {
if (!RxDatabaseManager.instance) {
RxDatabaseManager.instance = new RxDatabaseManager();
}
return RxDatabaseManager.instance;
}
public async init(): Promise<RxDatabase<MyDatabaseCollections>> {
if (this.db) return this.db;
if (__DEV__) {
// needs to be added in dev mode
addRxPlugin(RxDBDevModePlugin);
}
this.db = await createRxDatabase<MyDatabaseCollections>({
name: 'myDb',
storage: encryptedStorage,
multiInstance: false, // No multi-instance support for React Native
closeDuplicates: true, // Close duplicate database instances
});
await this.db.addCollections({
users: {
schema: userSchema,
conflictHandler: defaultConflictHandler,
migrationStrategies: {
// 1: function (oldDoc: UserType) {},
},
},
});
this.setupConnectivityListener();
return this.db;
}
public getDb(): RxDatabase<MyDatabaseCollections> {
if (!this.db) {
throw new Error('Database not initialized. Call init() first.');
}
return this.db;
}
private replicateCollection<T>(dto: ReplicateCollectionDto<T>) {
const { collection, replicationId, api } = dto;
const replicationState = replicateRxCollection<WithDeleted<T>, number>({
collection: collection,
replicationIdentifier: replicationId,
pull: {
async handler(checkpointOrNull: unknown, batchSize: number) {
const typedCheckpoint = checkpointOrNull as CheckPointType;
const updatedAt = typedCheckpoint ? typedCheckpoint.updatedAt : 0;
const id = typedCheckpoint ? typedCheckpoint.id : '';
const response = await api.pull({ updatedAt, id, batchSize });
return {
documents: response.data.documents,
checkpoint: response.data.checkpoint,
};
},
batchSize: 20,
},
push: {
async handler(changeRows) {
console.log('push');
const response = await api.push({ changeRows });
return response.data;
},
},
});
replicationState.active$.subscribe(v => {
console.log('Replication active$:', v);
});
replicationState.canceled$.subscribe(v => {
console.log('Replication canceled$:', v);
});
replicationState.error$.subscribe(async error => {
console.error('Replication error$:', error);
});
}
private async startReplication() {
const db = this.getDb();
this.replicateCollection<UserType>({
collection: db.users,
replicationId: '/users/sync',
api: {
push: usersApi.push,
pull: usersApi.pull,
},
});
}
private setupConnectivityListener() {
NetInfo.addEventListener(state => {
const wasOffline = !this.isOnline;
this.isOnline = Boolean(state.isConnected);
if (this.isOnline && wasOffline) {
this.onReconnected();
}
});
}
private async onReconnected() {
this.startReplication();
}
}
useEffect(() => {
const init = async () => {
const dbManager = RxDatabaseManager.getInstance();
dbManager
.init()
.then(() => {
setAppStatus('ready');
})
.catch((error) => {
console.log('Error initializing database:', error);
setAppStatus('error');
});
};
init();
}, []);
Репликация данных: синхронизация клиента и сервера
async handler(checkpointOrNull: unknown, batchSize: number) {
const typedCheckpoint = checkpointOrNull as CheckPointType;
const updatedAt = typedCheckpoint ? typedCheckpoint.updatedAt : 0;
const id = typedCheckpoint ? typedCheckpoint.id : '';
const response = await api.pull({ updatedAt, id, batchSize });
return {
documents: response.data.documents,
checkpoint: response.data.checkpoint,
};
},
async pull(dto: PullUsersDto): Promise<UserViewDto[]> {
const { id, updatedAt, batchSize } = dto;
const users = await this.users
.createQueryBuilder('user')
.where('user.updatedAt > :updatedAt', { updatedAt })
.orWhere('user.updatedAt = :updatedAt AND user.id > :id', {
updatedAt,
id,
})
.orderBy('user.updatedAt', 'ASC')
.addOrderBy('user.id', 'ASC')
.limit(batchSize)
.getMany();
return users.map(UserViewDto.mapToView);
}
async pull(dto: PullUsersDto) {
const users = await this.usersRepository.pull(dto);
const newCheckpoint =
users.length === 0
? { id: dto.id, updatedAt: dto.updatedAt }
: {
id: users.at(-1)!.id,
updatedAt: users.at(-1)!.updatedAt,
};
return {
documents: users,
checkpoint: newCheckpoint,
};
}
async handler(changeRows) {
const response = await api.push({ changeRows });
return response.data;
},
- Применяет его, если нет конфликта;
- Возвращает конфликт, если updatedAt этого документа на сервере новый.
async push(dto: PushUsersDto) {
const changeRows = dto.changeRows;
const existingUsers = await this.usersRepository.findByIds(
changeRows.map((changeRow) => changeRow.newDocumentState.id),
);
const existingMap = new Map(existingUsers.map((user) => [user.id, user]));
const toSave: UserViewDto[] = [];
const conflicts: UserViewDto[] = [];
for (const changeRow of changeRows) {
const newDoc = changeRow.newDocumentState;
const existing = existingMap.get(newDoc.id);
const isConflict = existing && existing.updatedAt > newDoc?.updatedAt;
if (isConflict) {
conflicts.push(existing);
} else {
toSave.push(newDoc);
}
if (toSave.length > 0) {
await this.usersRepository.save(toSave);
}
}
return conflicts;
}
Разрешение конфликтов
export const defaultConflictHandler: RxConflictHandler<{
updatedAt: number;
}> = {
isEqual(a, b) {
return a.updatedAt === b.updatedAt;
},
resolve({ assumedMasterState, realMasterState, newDocumentState }) {
return Promise.resolve(realMasterState);
},
};
Использование данных в приложении
export const UsersScreen = () => {
const users = useUsersSelector({
sort: [{ updatedAt: 'desc' }],
});
const { createUser, deleteUser, updateUser } = useUsersService();
return (
<View style={styles.container}>
{users.map(user => (
<Text key={user.id}>{user.name}</Text>
))}
<Button title={'Create new user'} onPress={createUser} />
<Button
disabled={users.length === 0}
title={'Update user'}
onPress={() => updateUser(users[0].id)}
/>
<Button
disabled={users.length === 0}
title={'Delete user'}
onPress={() => deleteUser(users[0].id)}
/>
</View>
);
};
export const useUsersSelector = (query?: MangoQuery<UserType>) => {
const userModel = RxDatabaseManager.getInstance().getDb().users;
const [users, setUsers] = useState<UserType[]>([]);
useEffect(() => {
const subscription = userModel.find(query).$.subscribe(result => {
setUsers(result);
});
return () => subscription.unsubscribe();
}, [userModel, query]);
return users;
};