Skip to content

Commit 344ee3e

Browse files
Apollon77claude
andauthored
fix(node): preserve original write-decline error and roll back local cache (#3678)
* fix(general): surface original commit-phase-2 errors to transaction caller Tx#executeCommit2 used to wrap any phase-2 participant failure in a generic FinalizationError carrying only the participant name. The original error (e.g. a StatusResponseError returned by a remote device declining a write) was logged but never reached the caller, so ClientNode setStateOf / agent.state / endpoint.set callers had no way to detect the device's status code. throwIfErrored now rethrows the original error when a single participant fails, and aggregates with MatterAggregateError when multiple do. The rollback path's mislabeled "in commit phase 2" message is corrected. Adds direct unit tests for the new contract (TransactionTest) and end-to-end ClientNode tests covering single rejection, partial multi- attribute rejection, and full multi-attribute rejection across two clusters, plus a server-side regression test for the preCommit path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(node): roll back local client cache on declined remote writes After PR #3676 the original StatusResponseError reaches the caller of ClientNode setStateOf / agent.state / endpoint.set, but the local client-side cache still held the value the user attempted to write. Subscriptions only deliver server-side changes, so the divergence persisted until restart. RemoteWriteParticipant now tracks a per-cluster snapshot (previousValues + writtenValues + compensator) per set() call. On commit2 failure the new RemoteWriter onFailure callback delivers per-attribute statuses; the participant groups failures by endpoint+cluster and invokes the compensator with the failed ids. DatasourceCache implements the compensator: it restores the pre-write value via the existing externalSet path (so persistence + listeners fire as if a subscription update arrived) but only when: - a baseline previous value was captured for the key, AND - the current local value still equals what was just written (skip restoration if a concurrent subscription update changed it). Adds three end-to-end tests covering single decline, partial decline across two clusters, and full decline. The tests fail when the compensate body is disabled, so they verify compensation rather than relying on subscription delivery. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(node): use reference identity for write-decline compensation guard Datasource keeps user-written values by reference and integrateExternalChange replaces a reference only when a subscription actually delivers a changed value (its own no-op deep-equal filter strips identical re-deliveries). The guard in DatasourceCache.compensate therefore only needs to check `current[id] !== writtenValues[id]` instead of running a full deep-equal on every key — the ref check captures real concurrent updates without the extra walk. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d80bf7a commit 344ee3e

6 files changed

Lines changed: 522 additions & 28 deletions

File tree

packages/general/src/transaction/Tx.ts

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import { Diagnostic } from "#log/Diagnostic.js";
88
import { Logger } from "#log/Logger.js";
9-
import { ImplementationError, ReadOnlyError } from "#MatterError.js";
9+
import { ImplementationError, MatterAggregateError, ReadOnlyError } from "#MatterError.js";
1010
import { Time, Timer } from "#time/Time.js";
1111
import { Timestamp } from "#time/Timestamp.js";
1212
import { Millis } from "#time/TimeUnit.js";
@@ -689,7 +689,7 @@ class Tx implements Transaction, Transaction.Finalization {
689689
#executeCommit2() {
690690
// Commit phase 2
691691
this.#status = Status.CommittingPhaseTwo;
692-
let errored: undefined | Array<Participant>;
692+
let errored: undefined | Array<ParticipantError>;
693693
let ongoing: undefined | Array<Promise<void>>;
694694
for (const participant of this.participants) {
695695
const promise = MaybePromise.then(
@@ -699,9 +699,9 @@ class Tx implements Transaction, Transaction.Finalization {
699699
logger.error(`Error committing (phase two) ${participant}, state inconsistency possible:`, error);
700700

701701
if (errored) {
702-
errored.push(participant);
702+
errored.push({ participant, error });
703703
} else {
704-
errored = [participant];
704+
errored = [{ participant, error }];
705705
}
706706
},
707707
);
@@ -765,7 +765,7 @@ class Tx implements Transaction, Transaction.Finalization {
765765
*/
766766
#executeRollback() {
767767
this.#status = Status.RollingBack;
768-
let errored: undefined | Array<Participant>;
768+
let errored: undefined | Array<ParticipantError>;
769769
let ongoing: undefined | Array<Promise<void>>;
770770

771771
for (const participant of this.participants) {
@@ -777,9 +777,9 @@ class Tx implements Transaction, Transaction.Finalization {
777777
logger.error(`Error rolling back ${participant}, state inconsistency possible:`, error);
778778

779779
if (errored) {
780-
errored.push(participant);
780+
errored.push({ participant, error });
781781
} else {
782-
errored = [participant];
782+
errored = [{ participant, error }];
783783
}
784784
},
785785
);
@@ -796,7 +796,7 @@ class Tx implements Transaction, Transaction.Finalization {
796796

797797
const finished = () => {
798798
this.#status = Status.Shared;
799-
throwIfErrored(errored, "in commit phase 2");
799+
throwIfErrored(errored, "during rollback");
800800
};
801801

802802
if (ongoing) {
@@ -842,16 +842,21 @@ class Tx implements Transaction, Transaction.Finalization {
842842
}
843843
}
844844

845-
function throwIfErrored(errored: undefined | Array<Participant>, when: string) {
845+
interface ParticipantError {
846+
participant: Participant;
847+
error: unknown;
848+
}
849+
850+
function throwIfErrored(errored: undefined | Array<ParticipantError>, when: string) {
846851
if (!errored?.length) {
847852
return;
848853
}
849-
const suffix = errored.length > 1 ? "s" : "";
850-
throw new FinalizationError(
851-
`Unhandled error${suffix} ${when} participant${suffix} ${describeList(
852-
"and",
853-
...errored.map(p => p.toString()),
854-
)}`,
854+
if (errored.length === 1) {
855+
throw errored[0].error;
856+
}
857+
throw new MatterAggregateError(
858+
errored.map(({ error }) => error),
859+
`Errors ${when} participants ${describeList("and", ...errored.map(({ participant }) => participant.toString()))}`,
855860
);
856861
}
857862

packages/general/test/transaction/TransactionTest.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
import { Lifetime } from "#index.js";
8+
import { MatterAggregateError } from "#MatterError.js";
89
import {
910
FinalizationError,
1011
SynchronousTransactionConflictError,
@@ -369,6 +370,60 @@ describe("Transaction", () => {
369370
});
370371
});
371372

373+
describe("propagates commit phase 2 errors to the caller", () => {
374+
test("rethrows the original error when a single participant fails", async () => {
375+
const original = new SomeError("phase 2 boom");
376+
const p = join({
377+
async commit2() {
378+
throw original;
379+
},
380+
});
381+
382+
await transaction.begin();
383+
384+
await expect(transaction.commit()).rejectedWith(original);
385+
386+
p.expect("commit1", "commit2");
387+
validateUnlocked(transaction);
388+
});
389+
390+
test("aggregates with MatterAggregateError when multiple participants fail", async () => {
391+
const e1 = new SomeError("phase 2 boom 1");
392+
const e2 = new SomeError("phase 2 boom 2");
393+
394+
const p1 = TestParticipant({
395+
async commit2() {
396+
throw e1;
397+
},
398+
});
399+
p1.toString = () => "P1";
400+
const p2 = TestParticipant({
401+
async commit2() {
402+
throw e2;
403+
},
404+
});
405+
p2.toString = () => "P2";
406+
transaction.addParticipants(p1, p2);
407+
408+
await transaction.begin();
409+
410+
let caught: unknown;
411+
try {
412+
await transaction.commit();
413+
} catch (e) {
414+
caught = e;
415+
}
416+
417+
expect(caught).instanceOf(MatterAggregateError);
418+
const aggregate = caught as MatterAggregateError;
419+
expect(aggregate.errors).deep.equals([e1, e2]);
420+
421+
p1.expect("commit1", "commit2");
422+
p2.expect("commit1", "commit2");
423+
validateUnlocked(transaction);
424+
});
425+
});
426+
372427
describe("locks and unlocks resource", () => {
373428
describe("asynchronously", () => {
374429
test("on becoming exclusive & committing", async () => {

packages/node/src/storage/client/DatasourceCache.ts

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import type { RemoteWriter } from "./RemoteWriter.js";
1818
*
1919
* This implements storage for attribute values for a single cluster loaded from peers.
2020
*/
21-
export class DatasourceCache implements Datasource.ExternallyMutableStore {
21+
export class DatasourceCache implements Datasource.ExternallyMutableStore, RemoteWriteParticipant.Compensator {
2222
#writer: RemoteWriter;
2323
#endpointNumber: EndpointNumber;
2424
#behaviorId: string;
@@ -61,7 +61,43 @@ export class DatasourceCache implements Datasource.ExternallyMutableStore {
6161
participant = new RemoteWriteParticipant(this.#writer);
6262
transaction.addParticipants(participant);
6363
}
64-
(participant as RemoteWriteParticipant).set(this.#endpointNumber, this.#behaviorId, values);
64+
const previousValues = this.#consumer?.readValues(new Set(Object.keys(values))) ?? {};
65+
(participant as RemoteWriteParticipant).set(this.#endpointNumber, this.#behaviorId, values, {
66+
compensator: this,
67+
previousValues,
68+
});
69+
}
70+
71+
/**
72+
* Restores local cache values whose remote write was declined.
73+
*
74+
* Only restores keys that:
75+
* - had a captured pre-write value (skip if the snapshot doesn't know the prior state), AND
76+
* - still hold the value we attempted to write (skip if a concurrent subscription update already moved on).
77+
*/
78+
async compensate(failedAttributeIds: Set<string>, previousValues: Val.Struct, writtenValues: Val.Struct) {
79+
if (this.#erased || this.#reclaimed || !this.#consumer) {
80+
return;
81+
}
82+
83+
const restore = new Map<string, Val>();
84+
const current = this.#consumer.readValues(failedAttributeIds);
85+
for (const id of failedAttributeIds) {
86+
if (!(id in previousValues)) {
87+
continue;
88+
}
89+
// Datasource keeps user values by reference; integrateExternalChange only replaces the
90+
// reference when a subscription actually changed the value, so a different ref here means
91+
// a real concurrent update arrived.
92+
if (current[id] !== writtenValues[id]) {
93+
continue;
94+
}
95+
restore.set(id, previousValues[id]);
96+
}
97+
98+
if (restore.size) {
99+
await this.externalSet(restore);
100+
}
65101
}
66102

67103
async externalSet(values: Val.StructMap) {

packages/node/src/storage/client/RemoteWriteParticipant.ts

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
* SPDX-License-Identifier: Apache-2.0
55
*/
66

7-
import { Transaction } from "@matter/general";
8-
import { Val } from "@matter/protocol";
7+
import { Logger, Transaction } from "@matter/general";
8+
import { Val, WriteResult } from "@matter/protocol";
99
import { EndpointNumber } from "@matter/types";
1010
import type { RemoteWriter } from "./RemoteWriter.js";
1111

12+
const logger = Logger.get("RemoteWriteParticipant");
13+
1214
/**
1315
* A transaction participant that persists changes to a remote node.
1416
*
@@ -17,6 +19,7 @@ import type { RemoteWriter } from "./RemoteWriter.js";
1719
export class RemoteWriteParticipant implements Transaction.Participant {
1820
#request: RemoteWriter.Request = [];
1921
#writer: RemoteWriter;
22+
#snapshots = new Map<string, RemoteWriteParticipant.Snapshot>();
2023

2124
/**
2225
* There is one participant for each transaction/writer pair. We use the writer function itself as the dedup key.
@@ -27,13 +30,41 @@ export class RemoteWriteParticipant implements Transaction.Participant {
2730

2831
/**
2932
* Add an attribute update to the write request.
33+
*
34+
* The optional {@link snapshot} captures the pre-write values for the affected attributes plus a
35+
* {@link RemoteWriteParticipant.Compensator} the participant invokes if the remote rejects the write.
3036
*/
31-
set(endpointNumber: EndpointNumber, behaviorId: string, values: Val.Struct) {
37+
set(
38+
endpointNumber: EndpointNumber,
39+
behaviorId: string,
40+
values: Val.Struct,
41+
snapshot?: RemoteWriteParticipant.SnapshotInput,
42+
) {
3243
this.#request.push({
3344
number: endpointNumber,
3445
behaviorId: behaviorId,
3546
values,
3647
});
48+
49+
if (snapshot) {
50+
const key = snapshotKey(endpointNumber, behaviorId);
51+
const existing = this.#snapshots.get(key);
52+
if (existing) {
53+
Object.assign(existing.writtenValues, values);
54+
for (const [k, v] of Object.entries(snapshot.previousValues)) {
55+
// Keep the earliest captured baseline; later writes within the same transaction don't overwrite it
56+
if (!(k in existing.previousValues)) {
57+
existing.previousValues[k] = v;
58+
}
59+
}
60+
} else {
61+
this.#snapshots.set(key, {
62+
compensator: snapshot.compensator,
63+
previousValues: { ...snapshot.previousValues },
64+
writtenValues: { ...values },
65+
});
66+
}
67+
}
3768
}
3869

3970
async commit2() {
@@ -42,13 +73,48 @@ export class RemoteWriteParticipant implements Transaction.Participant {
4273
}
4374

4475
const request = this.#request;
76+
const snapshots = this.#snapshots;
4577
this.#request = [];
78+
this.#snapshots = new Map();
4679

47-
await this.#writer(request);
80+
await this.#writer(request, failures => this.#compensate(snapshots, failures));
81+
}
82+
83+
async #compensate(
84+
snapshots: Map<string, RemoteWriteParticipant.Snapshot>,
85+
failures: WriteResult.AttributeStatus[],
86+
) {
87+
if (!snapshots.size) {
88+
return;
89+
}
90+
91+
const failedByCluster = new Map<string, Set<string>>();
92+
for (const f of failures) {
93+
const key = snapshotKey(f.path.endpointId, String(f.path.clusterId));
94+
let ids = failedByCluster.get(key);
95+
if (!ids) {
96+
ids = new Set();
97+
failedByCluster.set(key, ids);
98+
}
99+
ids.add(String(f.path.attributeId));
100+
}
101+
102+
for (const [key, snapshot] of snapshots) {
103+
const failedIds = failedByCluster.get(key);
104+
if (!failedIds?.size) {
105+
continue;
106+
}
107+
try {
108+
await snapshot.compensator.compensate(failedIds, snapshot.previousValues, snapshot.writtenValues);
109+
} catch (compensateError) {
110+
logger.warn(`Failed to restore local state for ${key} after remote write decline:`, compensateError);
111+
}
112+
}
48113
}
49114

50115
rollback() {
51116
this.#request = [];
117+
this.#snapshots = new Map();
52118
}
53119

54120
toString() {
@@ -59,3 +125,42 @@ export class RemoteWriteParticipant implements Transaction.Participant {
59125
this.#writer = writer;
60126
}
61127
}
128+
129+
function snapshotKey(endpointNumber: EndpointNumber | number, behaviorId: string) {
130+
return `${endpointNumber}|${behaviorId}`;
131+
}
132+
133+
export namespace RemoteWriteParticipant {
134+
/**
135+
* Restores local cache state for attribute writes the remote device declined.
136+
*
137+
* The participant invokes this with the failed attribute keys after a write fails. Implementations should only
138+
* restore values for keys present in {@link previousValues} AND only when the current local value still equals
139+
* what was just written — leaving concurrently-mutated values alone.
140+
*/
141+
export interface Compensator {
142+
compensate(
143+
failedAttributeIds: Set<string>,
144+
previousValues: Val.Struct,
145+
writtenValues: Val.Struct,
146+
): Promise<void>;
147+
}
148+
149+
/**
150+
* The pre-write baseline supplied by the caller of {@link RemoteWriteParticipant.set} together with the
151+
* {@link Compensator} responsible for restoring it.
152+
*/
153+
export interface SnapshotInput {
154+
compensator: Compensator;
155+
previousValues: Val.Struct;
156+
}
157+
158+
/**
159+
* Internal per-cluster snapshot held by the participant for the duration of a transaction.
160+
*/
161+
export interface Snapshot {
162+
compensator: Compensator;
163+
previousValues: Val.Struct;
164+
writtenValues: Val.Struct;
165+
}
166+
}

0 commit comments

Comments
 (0)