streamMessage method

  1. @override
Stream<String> streamMessage({
  1. required String systemPrompt,
  2. required List<Map<String, String>> messages,
  3. int maxTokens = 2048,
})
override

Stream a response, yielding text deltas as they arrive.

Implementation

@override
Stream<String> streamMessage({
  required String systemPrompt,
  required List<Map<String, String>> messages,
  int maxTokens = 2048,
}) async* {
  // Chain into the existing lock so overlapping callers queue rather
  // than race. Capture predecessor before awaiting.
  final previousLock = _inferenceLock;
  final completer = Completer<void>();
  _inferenceLock = completer.future;

  final stopwatch = Stopwatch();
  var firstTokenAtMs = -1;
  var tokens = 0;
  EngineChat? chat;

  try {
    await previousLock;
    stopwatch.start();

    final engine = await _ensureEngineLoaded();
    chat = await engine.createChat();
    if (systemPrompt.isNotEmpty) {
      chat.addSystem(systemPrompt);
    }
    for (final msg in messages) {
      final role = msg['role'] ?? 'user';
      final content = msg['content'] ?? '';
      if (content.isEmpty) continue;
      if (role == 'assistant') {
        chat.addAssistant(content);
      } else if (role == 'system') {
        chat.addSystem(content);
      } else {
        chat.addUser(content);
      }
    }

    const sampler = SamplerParams(temperature: 0.7, topK: 40, topP: 0.95);
    await for (final event in chat.generate(
      sampler: sampler,
      maxTokens: maxTokens,
    )) {
      switch (event) {
        case TokenEvent():
          if (firstTokenAtMs < 0) {
            firstTokenAtMs = stopwatch.elapsedMilliseconds;
            dev.log(
              'first token in ${firstTokenAtMs}ms',
              name: 'LocalLlm.perf',
            );
          }
          tokens++;
          yield event.text;
        case ShiftEvent():
          // Context shift — no user-visible output; surfaces only when
          // the conversation overflows nCtx. llama_cpp_dart handles the
          // shift internally per ContextShiftPolicy.
          break;
        case DoneEvent():
          if (event.trailingText.isNotEmpty) {
            yield event.trailingText;
          }
      }
    }
  } finally {
    stopwatch.stop();
    final totalMs = stopwatch.elapsedMilliseconds;
    final genMs = firstTokenAtMs < 0 ? 0 : totalMs - firstTokenAtMs;
    final tps = genMs > 0 ? (tokens * 1000.0) / genMs : 0.0;
    dev.log(
      '$tokens tokens in ${totalMs}ms '
      '(first-token ${firstTokenAtMs}ms, '
      'gen ${genMs}ms, ${tps.toStringAsFixed(1)} tok/s)',
      name: 'LocalLlm.perf',
    );
    try {
      await chat?.dispose();
    } catch (e) {
      dev.log('Error disposing chat: $e', name: 'LocalLlm');
    }
    completer.complete();
  }
}