sessions.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. 'use strict';
  2. const retrieveBSON = require('./connection/utils').retrieveBSON;
  3. const EventEmitter = require('events');
  4. const BSON = retrieveBSON();
  5. const Binary = BSON.Binary;
  6. const uuidV4 = require('./utils').uuidV4;
  7. const MongoError = require('./error').MongoError;
  8. const isRetryableError = require('././error').isRetryableError;
  9. const isRetryableEndTransactionError = require('././error').isRetryableEndTransactionError;
  10. const MongoNetworkError = require('./error').MongoNetworkError;
  11. const MongoWriteConcernError = require('./error').MongoWriteConcernError;
  12. const Transaction = require('./transactions').Transaction;
  13. const TxnState = require('./transactions').TxnState;
  14. const isPromiseLike = require('./utils').isPromiseLike;
  15. const ReadPreference = require('./topologies/read_preference');
  16. const maybePromise = require('../utils').maybePromise;
  17. const isTransactionCommand = require('./transactions').isTransactionCommand;
  18. const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
  19. const isSharded = require('./wireprotocol/shared').isSharded;
  20. const maxWireVersion = require('./utils').maxWireVersion;
  21. const now = require('./../utils').now;
  22. const calculateDurationInMs = require('./../utils').calculateDurationInMs;
  23. const minWireVersionForShardedTransactions = 8;
  24. function assertAlive(session, callback) {
  25. if (session.serverSession == null) {
  26. const error = new MongoError('Cannot use a session that has ended');
  27. if (typeof callback === 'function') {
  28. callback(error, null);
  29. return false;
  30. }
  31. throw error;
  32. }
  33. return true;
  34. }
  35. /**
  36. * Options to pass when creating a Client Session
  37. * @typedef {Object} SessionOptions
  38. * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
  39. * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
  40. */
  41. /**
  42. * A BSON document reflecting the lsid of a {@link ClientSession}
  43. * @typedef {Object} SessionId
  44. */
  45. const kServerSession = Symbol('serverSession');
  46. /**
  47. * A class representing a client session on the server
  48. * WARNING: not meant to be instantiated directly.
  49. * @class
  50. * @hideconstructor
  51. */
  52. class ClientSession extends EventEmitter {
  53. /**
  54. * Create a client session.
  55. * WARNING: not meant to be instantiated directly
  56. *
  57. * @param {Topology} topology The current client's topology (Internal Class)
  58. * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
  59. * @param {SessionOptions} [options] Optional settings
  60. * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
  61. */
  62. constructor(topology, sessionPool, options, clientOptions) {
  63. super();
  64. if (topology == null) {
  65. throw new Error('ClientSession requires a topology');
  66. }
  67. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  68. throw new Error('ClientSession requires a ServerSessionPool');
  69. }
  70. options = options || {};
  71. clientOptions = clientOptions || {};
  72. this.topology = topology;
  73. this.sessionPool = sessionPool;
  74. this.hasEnded = false;
  75. this.clientOptions = clientOptions;
  76. this[kServerSession] = undefined;
  77. this.supports = {
  78. causalConsistency:
  79. typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
  80. };
  81. this.clusterTime = options.initialClusterTime;
  82. this.operationTime = null;
  83. this.explicit = !!options.explicit;
  84. this.owner = options.owner;
  85. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  86. this.transaction = new Transaction();
  87. }
  88. /**
  89. * The server id associated with this session
  90. * @type {SessionId}
  91. */
  92. get id() {
  93. return this.serverSession.id;
  94. }
  95. get serverSession() {
  96. if (this[kServerSession] == null) {
  97. this[kServerSession] = this.sessionPool.acquire();
  98. }
  99. return this[kServerSession];
  100. }
  101. /**
  102. * Ends this session on the server
  103. *
  104. * @param {Object} [options] Optional settings. Currently reserved for future use
  105. * @param {Function} [callback] Optional callback for completion of this operation
  106. */
  107. endSession(options, callback) {
  108. if (typeof options === 'function') (callback = options), (options = {});
  109. options = options || {};
  110. const session = this;
  111. return maybePromise(this, callback, done => {
  112. if (session.hasEnded) {
  113. return done();
  114. }
  115. function completeEndSession() {
  116. // release the server session back to the pool
  117. session.sessionPool.release(session.serverSession);
  118. session[kServerSession] = undefined;
  119. // mark the session as ended, and emit a signal
  120. session.hasEnded = true;
  121. session.emit('ended', session);
  122. // spec indicates that we should ignore all errors for `endSessions`
  123. done();
  124. }
  125. if (session.serverSession && session.inTransaction()) {
  126. session.abortTransaction(err => {
  127. if (err) return done(err);
  128. completeEndSession();
  129. });
  130. return;
  131. }
  132. completeEndSession();
  133. });
  134. }
  135. /**
  136. * Advances the operationTime for a ClientSession.
  137. *
  138. * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
  139. */
  140. advanceOperationTime(operationTime) {
  141. if (this.operationTime == null) {
  142. this.operationTime = operationTime;
  143. return;
  144. }
  145. if (operationTime.greaterThan(this.operationTime)) {
  146. this.operationTime = operationTime;
  147. }
  148. }
  149. /**
  150. * Used to determine if this session equals another
  151. * @param {ClientSession} session
  152. * @return {boolean} true if the sessions are equal
  153. */
  154. equals(session) {
  155. if (!(session instanceof ClientSession)) {
  156. return false;
  157. }
  158. return this.id.id.buffer.equals(session.id.id.buffer);
  159. }
  160. /**
  161. * Increment the transaction number on the internal ServerSession
  162. */
  163. incrementTransactionNumber() {
  164. this.serverSession.txnNumber++;
  165. }
  166. /**
  167. * @returns {boolean} whether this session is currently in a transaction or not
  168. */
  169. inTransaction() {
  170. return this.transaction.isActive;
  171. }
  172. /**
  173. * Starts a new transaction with the given options.
  174. *
  175. * @param {TransactionOptions} options Options for the transaction
  176. */
  177. startTransaction(options) {
  178. assertAlive(this);
  179. if (this.inTransaction()) {
  180. throw new MongoError('Transaction already in progress');
  181. }
  182. const topologyMaxWireVersion = maxWireVersion(this.topology);
  183. if (
  184. isSharded(this.topology) &&
  185. topologyMaxWireVersion != null &&
  186. topologyMaxWireVersion < minWireVersionForShardedTransactions
  187. ) {
  188. throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  189. }
  190. // increment txnNumber
  191. this.incrementTransactionNumber();
  192. // create transaction state
  193. this.transaction = new Transaction(
  194. Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
  195. );
  196. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  197. }
  198. /**
  199. * Commits the currently active transaction in this session.
  200. *
  201. * @param {Function} [callback] optional callback for completion of this operation
  202. * @return {Promise} A promise is returned if no callback is provided
  203. */
  204. commitTransaction(callback) {
  205. return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done));
  206. }
  207. /**
  208. * Aborts the currently active transaction in this session.
  209. *
  210. * @param {Function} [callback] optional callback for completion of this operation
  211. * @return {Promise} A promise is returned if no callback is provided
  212. */
  213. abortTransaction(callback) {
  214. return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done));
  215. }
  216. /**
  217. * This is here to ensure that ClientSession is never serialized to BSON.
  218. * @ignore
  219. */
  220. toBSON() {
  221. throw new Error('ClientSession cannot be serialized to BSON.');
  222. }
  223. /**
  224. * A user provided function to be run within a transaction
  225. *
  226. * @callback WithTransactionCallback
  227. * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
  228. * @returns {Promise} The resulting Promise of operations run within this transaction
  229. */
  230. /**
  231. * Runs a provided lambda within a transaction, retrying either the commit operation
  232. * or entire transaction as needed (and when the error permits) to better ensure that
  233. * the transaction can complete successfully.
  234. *
  235. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  236. * return a Promise will result in undefined behavior.
  237. *
  238. * @param {WithTransactionCallback} fn
  239. * @param {TransactionOptions} [options] Optional settings for the transaction
  240. */
  241. withTransaction(fn, options) {
  242. const startTime = now();
  243. return attemptTransaction(this, startTime, fn, options);
  244. }
  245. }
  246. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  247. const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
  248. const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
  249. const MAX_TIME_MS_EXPIRED_CODE = 50;
  250. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  251. 'CannotSatisfyWriteConcern',
  252. 'UnknownReplWriteConcern',
  253. 'UnsatisfiableWriteConcern'
  254. ]);
  255. function hasNotTimedOut(startTime, max) {
  256. return calculateDurationInMs(startTime) < max;
  257. }
  258. function isUnknownTransactionCommitResult(err) {
  259. return (
  260. isMaxTimeMSExpiredError(err) ||
  261. (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
  262. err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
  263. err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
  264. );
  265. }
  266. function isMaxTimeMSExpiredError(err) {
  267. if (err == null) return false;
  268. return (
  269. err.code === MAX_TIME_MS_EXPIRED_CODE ||
  270. (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
  271. );
  272. }
  273. function attemptTransactionCommit(session, startTime, fn, options) {
  274. return session.commitTransaction().catch(err => {
  275. if (
  276. err instanceof MongoError &&
  277. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  278. !isMaxTimeMSExpiredError(err)
  279. ) {
  280. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  281. return attemptTransactionCommit(session, startTime, fn, options);
  282. }
  283. if (err.hasErrorLabel('TransientTransactionError')) {
  284. return attemptTransaction(session, startTime, fn, options);
  285. }
  286. }
  287. throw err;
  288. });
  289. }
  290. const USER_EXPLICIT_TXN_END_STATES = new Set([
  291. TxnState.NO_TRANSACTION,
  292. TxnState.TRANSACTION_COMMITTED,
  293. TxnState.TRANSACTION_ABORTED
  294. ]);
  295. function userExplicitlyEndedTransaction(session) {
  296. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  297. }
  298. function attemptTransaction(session, startTime, fn, options) {
  299. session.startTransaction(options);
  300. let promise;
  301. try {
  302. promise = fn(session);
  303. } catch (err) {
  304. promise = Promise.reject(err);
  305. }
  306. if (!isPromiseLike(promise)) {
  307. session.abortTransaction();
  308. throw new TypeError('Function provided to `withTransaction` must return a Promise');
  309. }
  310. return promise
  311. .then(() => {
  312. if (userExplicitlyEndedTransaction(session)) {
  313. return;
  314. }
  315. return attemptTransactionCommit(session, startTime, fn, options);
  316. })
  317. .catch(err => {
  318. function maybeRetryOrThrow(err) {
  319. if (
  320. err instanceof MongoError &&
  321. err.hasErrorLabel('TransientTransactionError') &&
  322. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  323. ) {
  324. return attemptTransaction(session, startTime, fn, options);
  325. }
  326. if (isMaxTimeMSExpiredError(err)) {
  327. err.addErrorLabel('UnknownTransactionCommitResult');
  328. }
  329. throw err;
  330. }
  331. if (session.transaction.isActive) {
  332. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  333. }
  334. return maybeRetryOrThrow(err);
  335. });
  336. }
  337. function endTransaction(session, commandName, callback) {
  338. if (!assertAlive(session, callback)) {
  339. // checking result in case callback was called
  340. return;
  341. }
  342. // handle any initial problematic cases
  343. let txnState = session.transaction.state;
  344. if (txnState === TxnState.NO_TRANSACTION) {
  345. callback(new MongoError('No transaction started'));
  346. return;
  347. }
  348. if (commandName === 'commitTransaction') {
  349. if (
  350. txnState === TxnState.STARTING_TRANSACTION ||
  351. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  352. ) {
  353. // the transaction was never started, we can safely exit here
  354. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  355. callback(null, null);
  356. return;
  357. }
  358. if (txnState === TxnState.TRANSACTION_ABORTED) {
  359. callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
  360. return;
  361. }
  362. } else {
  363. if (txnState === TxnState.STARTING_TRANSACTION) {
  364. // the transaction was never started, we can safely exit here
  365. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  366. callback(null, null);
  367. return;
  368. }
  369. if (txnState === TxnState.TRANSACTION_ABORTED) {
  370. callback(new MongoError('Cannot call abortTransaction twice'));
  371. return;
  372. }
  373. if (
  374. txnState === TxnState.TRANSACTION_COMMITTED ||
  375. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  376. ) {
  377. callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
  378. return;
  379. }
  380. }
  381. // construct and send the command
  382. const command = { [commandName]: 1 };
  383. // apply a writeConcern if specified
  384. let writeConcern;
  385. if (session.transaction.options.writeConcern) {
  386. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  387. } else if (session.clientOptions && session.clientOptions.w) {
  388. writeConcern = { w: session.clientOptions.w };
  389. }
  390. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  391. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  392. }
  393. if (writeConcern) {
  394. Object.assign(command, { writeConcern });
  395. }
  396. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  397. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  398. }
  399. function commandHandler(e, r) {
  400. if (commandName === 'commitTransaction') {
  401. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  402. if (e) {
  403. if (
  404. e instanceof MongoNetworkError ||
  405. e instanceof MongoWriteConcernError ||
  406. isRetryableError(e) ||
  407. isMaxTimeMSExpiredError(e)
  408. ) {
  409. if (isUnknownTransactionCommitResult(e)) {
  410. e.addErrorLabel('UnknownTransactionCommitResult');
  411. // per txns spec, must unpin session in this case
  412. session.transaction.unpinServer();
  413. }
  414. } else if (e.hasErrorLabel('TransientTransactionError')) {
  415. session.transaction.unpinServer();
  416. }
  417. }
  418. } else {
  419. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  420. }
  421. callback(e, r);
  422. }
  423. // The spec indicates that we should ignore all errors on `abortTransaction`
  424. function transactionError(err) {
  425. return commandName === 'commitTransaction' ? err : null;
  426. }
  427. if (
  428. // Assumption here that commandName is "commitTransaction" or "abortTransaction"
  429. session.transaction.recoveryToken &&
  430. supportsRecoveryToken(session)
  431. ) {
  432. command.recoveryToken = session.transaction.recoveryToken;
  433. }
  434. // send the command
  435. session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
  436. if (err && isRetryableEndTransactionError(err)) {
  437. // SPEC-1185: apply majority write concern when retrying commitTransaction
  438. if (command.commitTransaction) {
  439. // per txns spec, must unpin session in this case
  440. session.transaction.unpinServer();
  441. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  442. w: 'majority'
  443. });
  444. }
  445. return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
  446. commandHandler(transactionError(_err), _reply)
  447. );
  448. }
  449. commandHandler(transactionError(err), reply);
  450. });
  451. }
  452. function supportsRecoveryToken(session) {
  453. const topology = session.topology;
  454. return !!topology.s.options.useRecoveryToken;
  455. }
  456. /**
  457. * Reflects the existence of a session on the server. Can be reused by the session pool.
  458. * WARNING: not meant to be instantiated directly. For internal use only.
  459. * @ignore
  460. */
  461. class ServerSession {
  462. constructor() {
  463. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  464. this.lastUse = now();
  465. this.txnNumber = 0;
  466. this.isDirty = false;
  467. }
  468. /**
  469. * Determines if the server session has timed out.
  470. * @ignore
  471. * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
  472. * @return {boolean} true if the session has timed out.
  473. */
  474. hasTimedOut(sessionTimeoutMinutes) {
  475. // Take the difference of the lastUse timestamp and now, which will result in a value in
  476. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  477. const idleTimeMinutes = Math.round(
  478. ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
  479. );
  480. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  481. }
  482. }
  483. /**
  484. * Maintains a pool of Server Sessions.
  485. * For internal use only
  486. * @ignore
  487. */
  488. class ServerSessionPool {
  489. constructor(topology) {
  490. if (topology == null) {
  491. throw new Error('ServerSessionPool requires a topology');
  492. }
  493. this.topology = topology;
  494. this.sessions = [];
  495. }
  496. /**
  497. * Ends all sessions in the session pool.
  498. * @ignore
  499. */
  500. endAllPooledSessions(callback) {
  501. if (this.sessions.length) {
  502. this.topology.endSessions(
  503. this.sessions.map(session => session.id),
  504. () => {
  505. this.sessions = [];
  506. if (typeof callback === 'function') {
  507. callback();
  508. }
  509. }
  510. );
  511. return;
  512. }
  513. if (typeof callback === 'function') {
  514. callback();
  515. }
  516. }
  517. /**
  518. * Acquire a Server Session from the pool.
  519. * Iterates through each session in the pool, removing any stale sessions
  520. * along the way. The first non-stale session found is removed from the
  521. * pool and returned. If no non-stale session is found, a new ServerSession
  522. * is created.
  523. * @ignore
  524. * @returns {ServerSession}
  525. */
  526. acquire() {
  527. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  528. while (this.sessions.length) {
  529. const session = this.sessions.shift();
  530. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  531. return session;
  532. }
  533. }
  534. return new ServerSession();
  535. }
  536. /**
  537. * Release a session to the session pool
  538. * Adds the session back to the session pool if the session has not timed out yet.
  539. * This method also removes any stale sessions from the pool.
  540. * @ignore
  541. * @param {ServerSession} session The session to release to the pool
  542. */
  543. release(session) {
  544. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  545. while (this.sessions.length) {
  546. const pooledSession = this.sessions[this.sessions.length - 1];
  547. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  548. this.sessions.pop();
  549. } else {
  550. break;
  551. }
  552. }
  553. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  554. if (session.isDirty) {
  555. return;
  556. }
  557. // otherwise, readd this session to the session pool
  558. this.sessions.unshift(session);
  559. }
  560. }
  561. }
  562. // TODO: this should be codified in command construction
  563. // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
  564. function commandSupportsReadConcern(command, options) {
  565. if (
  566. command.aggregate ||
  567. command.count ||
  568. command.distinct ||
  569. command.find ||
  570. command.parallelCollectionScan ||
  571. command.geoNear ||
  572. command.geoSearch
  573. ) {
  574. return true;
  575. }
  576. if (
  577. command.mapReduce &&
  578. options &&
  579. options.out &&
  580. (options.out.inline === 1 || options.out === 'inline')
  581. ) {
  582. return true;
  583. }
  584. return false;
  585. }
  586. /**
  587. * Optionally decorate a command with sessions specific keys
  588. *
  589. * @ignore
  590. * @param {ClientSession} session the session tracking transaction state
  591. * @param {Object} command the command to decorate
  592. * @param {Object} topology the topology for tracking the cluster time
  593. * @param {Object} [options] Optional settings passed to calling operation
  594. * @return {MongoError|null} An error, if some error condition was met
  595. */
  596. function applySession(session, command, options) {
  597. if (session.hasEnded) {
  598. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  599. return new MongoError('Cannot use a session that has ended');
  600. }
  601. // SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
  602. if (options && options.writeConcern && options.writeConcern.w === 0) {
  603. return;
  604. }
  605. const serverSession = session.serverSession;
  606. serverSession.lastUse = now();
  607. command.lsid = serverSession.id;
  608. // first apply non-transaction-specific sessions data
  609. const inTransaction = session.inTransaction() || isTransactionCommand(command);
  610. const isRetryableWrite = options.willRetryWrite;
  611. const shouldApplyReadConcern = commandSupportsReadConcern(command, options);
  612. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  613. command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
  614. }
  615. // now attempt to apply transaction-specific sessions data
  616. if (!inTransaction) {
  617. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  618. session.transaction.transition(TxnState.NO_TRANSACTION);
  619. }
  620. // TODO: the following should only be applied to read operation per spec.
  621. // for causal consistency
  622. if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
  623. command.readConcern = command.readConcern || {};
  624. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  625. }
  626. return;
  627. }
  628. if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
  629. return new MongoError(
  630. `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
  631. );
  632. }
  633. // `autocommit` must always be false to differentiate from retryable writes
  634. command.autocommit = false;
  635. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  636. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  637. command.startTransaction = true;
  638. const readConcern =
  639. session.transaction.options.readConcern || session.clientOptions.readConcern;
  640. if (readConcern) {
  641. command.readConcern = readConcern;
  642. }
  643. if (session.supports.causalConsistency && session.operationTime) {
  644. command.readConcern = command.readConcern || {};
  645. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  646. }
  647. }
  648. }
  649. function updateSessionFromResponse(session, document) {
  650. if (document.$clusterTime) {
  651. resolveClusterTime(session, document.$clusterTime);
  652. }
  653. if (document.operationTime && session && session.supports.causalConsistency) {
  654. session.advanceOperationTime(document.operationTime);
  655. }
  656. if (document.recoveryToken && session && session.inTransaction()) {
  657. session.transaction._recoveryToken = document.recoveryToken;
  658. }
  659. }
  660. module.exports = {
  661. ClientSession,
  662. ServerSession,
  663. ServerSessionPool,
  664. TxnState,
  665. applySession,
  666. updateSessionFromResponse,
  667. commandSupportsReadConcern
  668. };